Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6e848e7d3 -> cba369929


[GOBBLIN-598] Add DistcpFileSplitter to allow for block level distcp

[GOBBLIN-598] Add DistcpFileSplitter to allow for
block level distcp

Fix task state bug for merging in block distcp

Address review comments, and add javadoc/comments

AllowSplit and ADL modifications

Closes #2461 from cshen98/distcp1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cba36992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cba36992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cba36992

Branch: refs/heads/master
Commit: cba3699299c7a4013fc4c193caebe87c1cac5b76
Parents: 6e848e7
Author: Carl Shen <[email protected]>
Authored: Mon Oct 22 08:39:46 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 22 08:39:46 2018 -0700

----------------------------------------------------------------------
 .../data/management/copy/CopySource.java        |  10 +-
 .../data/management/copy/CopyableFile.java      |  16 ++
 .../management/copy/FileAwareInputStream.java   |  21 +-
 .../copy/converter/DistcpConverter.java         |   2 +-
 .../FileAwareInputStreamExtractor.java          |  31 ++-
 .../copy/publisher/CopyDataPublisher.java       |  14 +-
 .../copy/splitter/DistcpFileSplitter.java       | 243 +++++++++++++++++++
 .../writer/FileAwareInputStreamDataWriter.java  |  66 +++--
 .../writer/TarArchiveInputStreamDataWriter.java |   5 +-
 .../copy/converter/DecryptConverterTest.java    |   8 +-
 .../copy/converter/UnGzipConverterTest.java     |  10 +-
 .../copy/splitter/DistcpFileSplitterTest.java   | 142 +++++++++++
 .../FileAwareInputStreamDataWriterTest.java     |  56 ++++-
 .../TarArchiveInputStreamDataWriterTest.java    |   3 +-
 .../apache/gobblin/util/io/StreamCopier.java    |  41 +++-
 .../gobblin/util/io/StreamCopierTest.java       |  15 +-
 16 files changed, 612 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 9ae9b45..a2d9dc2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -55,6 +55,7 @@ import 
org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
 import 
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
 import 
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
 import 
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import 
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
 import 
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
@@ -217,7 +218,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
               try {
                 return 
GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor(
                     new 
ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias),
-                    input.getDataset(), input, state, workUnitsMap, 
watermarkGenerator, minWorkUnitWeight, lineageInfo);
+                    input.getDataset(), input, state, targetFs, workUnitsMap, 
watermarkGenerator, minWorkUnitWeight, lineageInfo);
               } catch (Exception e) {
                 throw new RuntimeException("Cannot create workunits 
generator", e);
               }
@@ -335,6 +336,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     protected final CopyableDatasetBase copyableDataset;
     protected final FileSet<CopyEntity> fileSet;
     protected final State state;
+    protected final FileSystem targetFs;
     protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
     protected final Optional<CopyableFileWatermarkGenerator> 
watermarkGenerator;
     protected final long minWorkUnitWeight;
@@ -365,8 +367,12 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
           setWorkUnitWeight(workUnit, copyEntity, minWorkUnitWeight);
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
-          workUnitsForPartition.add(workUnit);
           addLineageInfo(copyEntity, workUnit);
+          if (copyEntity instanceof CopyableFile && 
DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+            
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) 
copyEntity, workUnit, this.targetFs));
+          } else {
+            workUnitsForPartition.add(workUnit);
+          }
         }
 
         this.workUnitList.putAll(this.fileSet, workUnitsForPartition);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 9ad918c..6c1093f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -352,6 +352,22 @@ public class CopyableFile extends CopyEntity implements 
File {
   }
 
   /**
+   * @return desired block size for destination file.
+   */
+  public long getBlockSize(FileSystem targetFs) {
+    return getPreserve().preserve(PreserveAttributes.Option.BLOCK_SIZE) ?
+        getOrigin().getBlockSize() : 
targetFs.getDefaultBlockSize(this.destination);
+  }
+
+  /**
+   * @return desired replication for destination file.
+   */
+  public short getReplication(FileSystem targetFs) {
+    return getPreserve().preserve(PreserveAttributes.Option.REPLICATION) ?
+        getOrigin().getReplication() : 
targetFs.getDefaultReplication(this.destination);
+  }
+
+  /**
    * Generates a replicable guid to uniquely identify the origin of this 
{@link CopyableFile}.
    * @return a guid uniquely identifying the origin file.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java
index b399ec1..8d43c36 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java
@@ -19,19 +19,36 @@ package org.apache.gobblin.data.management.copy;
 
 import java.io.InputStream;
 
-import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.NonNull;
 import lombok.Getter;
 
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
+
+
 /**
  * A wrapper to {@link InputStream} that represents an entity to be copied. 
The enclosed {@link CopyableFile} instance
  * contains file Metadata like permission, destination path etc. required by 
the writers and converters.
+ * The enclosed {@link DistcpFileSplitter.Split} object indicates whether the 
{@link InputStream} to be copied is a
+ * block of the {@link CopyableFile} or not. If it is present, the {@link 
InputStream} should already be at the start
+ * position of the specified split/block.
  */
-@AllArgsConstructor
 @Getter
 public class FileAwareInputStream {
 
   private CopyableFile file;
   private InputStream inputStream;
+  private Optional<DistcpFileSplitter.Split> split = Optional.absent();
+
+  @Builder(toBuilder = true)
+  public FileAwareInputStream(@NonNull CopyableFile file, @NonNull InputStream 
inputStream,
+      Optional<DistcpFileSplitter.Split> split) {
+    this.file = file;
+    this.inputStream = inputStream;
+    this.split = split == null ? Optional.<DistcpFileSplitter.Split>absent() : 
split;
+  }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java
index ee6f221..f5f122e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java
@@ -85,7 +85,7 @@ public abstract class DistcpConverter extends 
Converter<String, String, FileAwar
     modifyExtensionAtDestination(fileAwareInputStream.getFile());
     try {
       InputStream newInputStream = 
inputStreamTransformation().apply(fileAwareInputStream.getInputStream());
-      return new SingleRecordIterable<>(new 
FileAwareInputStream(fileAwareInputStream.getFile(), newInputStream));
+      return new 
SingleRecordIterable<>(fileAwareInputStream.toBuilder().inputStream(newInputStream).build());
     } catch (RuntimeException re) {
       throw new DataConversionException(re);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
index 4b212bf..9863a98 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
@@ -17,21 +17,24 @@
 
 package org.apache.gobblin.data.management.copy.extractor;
 
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.io.EmptyInputStream;
 import org.apache.gobblin.util.io.MeteredInputStream;
 
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
 
 /**
  * An implementation of {@link Extractor} that extracts {@link InputStream}s. 
This extractor is suitable for copy jobs
@@ -81,11 +84,21 @@ public class FileAwareInputStreamExtractor implements 
Extractor<String, FileAwar
           this.state == null ? HadoopUtils.newConfiguration() : 
HadoopUtils.getConfFromState(this.state);
       FileSystem fsFromFile = 
this.file.getOrigin().getPath().getFileSystem(conf);
       this.recordRead = true;
+      FileAwareInputStream.FileAwareInputStreamBuilder builder = 
FileAwareInputStream.builder().file(this.file);
       if (this.file.getFileStatus().isDirectory()) {
-        return new FileAwareInputStream(this.file, EmptyInputStream.instance);
+        return builder.inputStream(EmptyInputStream.instance).build();
+      }
+
+      FSDataInputStream dataInputStream = 
fsFromFile.open(this.file.getFileStatus().getPath());
+      if (this.state != null && 
DistcpFileSplitter.isSplitWorkUnit(this.state)) {
+        Optional<DistcpFileSplitter.Split> split = 
DistcpFileSplitter.getSplit(this.state);
+        builder.split(split);
+        if (split.isPresent()) {
+          dataInputStream.seek(split.get().getLowPosition());
+        }
       }
-      return new FileAwareInputStream(this.file,
-          
MeteredInputStream.builder().in(fsFromFile.open(this.file.getFileStatus().getPath())).build());
+      
builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build());
+      return builder.build();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index ec7b1a0..cfd715a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.data.management.copy.publisher;
 
-
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -29,6 +25,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -41,6 +39,7 @@ import com.google.common.collect.Multimap;
 
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
@@ -53,18 +52,20 @@ import 
org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
 import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.WriterUtils;
 
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * A {@link DataPublisher} to {@link 
org.apache.gobblin.data.management.copy.CopyEntity}s from task output to final 
destination.
@@ -185,6 +186,9 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
         
.deserialize(datasetWorkUnitStates.iterator().next().getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
     Path datasetWriterOutputPath = new Path(this.writerOutputDir, 
datasetAndPartition.identifier());
 
+    log.info("Merging all split work units.");
+    DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
+
     log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", 
datasetAndPartition.identifier(),
         datasetWriterOutputPath, metadata.getDatasetURN()));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java
new file mode 100644
index 0000000..688bb6b
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.splitter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.IdentityConverter;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopySource;
+import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
+import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.guid.Guid;
+
+
+/**
+ * Helper class for splitting files for distcp. The property flag 
gobblin.copy.split.enabled should be used to enable
+ * splitting of files (which is disabled by default). Splitting should only be 
used if the distcp job uses only the
+ * IdentityConverter and should not be used for distcp jobs that require 
decryption/ungzipping.
+ */
+@Slf4j
+public class DistcpFileSplitter {
+
+  public static final String SPLIT_ENABLED = CopyConfiguration.COPY_PREFIX + 
".split.enabled";
+  public static final String MAX_SPLIT_SIZE_KEY = 
CopyConfiguration.COPY_PREFIX + ".file.max.split.size";
+
+  public static final long DEFAULT_MAX_SPLIT_SIZE = Long.MAX_VALUE;
+  public static final Set<String> KNOWN_SCHEMES_SUPPORTING_CONCAT = 
Sets.newHashSet("hdfs", "adl");
+
+  /**
+   * A split for a distcp file. Represents a section of a file; split should 
be aligned to block boundaries.
+   */
+  @Data
+  public static class Split {
+    private final long lowPosition;
+    private final long highPosition;
+    private final int splitNumber;
+    private final int totalSplits;
+    private final String partName;
+
+    public final boolean isLastSplit() {
+      return this.splitNumber == this.totalSplits - 1;
+    }
+  }
+
+  private static final String SPLIT_KEY = CopyConfiguration.COPY_PREFIX + 
".file.splitter.split";
+  private static final Gson GSON = new Gson();
+
+  /**
+   * Split an input {@link CopyableFile} into multiple splits aligned with 
block boundaries.
+   *
+   * @param file {@link CopyableFile} to split.
+   * @param workUnit {@link WorkUnit} generated for this file.
+   * @param targetFs destination {@link FileSystem} where file is to be copied.
+   * @return a list of {@link WorkUnit}, each for a split of this file.
+   * @throws IOException
+   */
+  public static Collection<WorkUnit> splitFile(CopyableFile file, WorkUnit 
workUnit, FileSystem targetFs)
+      throws IOException {
+    long len = file.getFileStatus().getLen();
+    // get lcm of source and target block size so that split aligns with block 
boundaries for both extract and write
+    long blockSize = ArithmeticUtils.lcm(file.getFileStatus().getBlockSize(), 
file.getBlockSize(targetFs));
+    long maxSplitSize = workUnit.getPropAsLong(MAX_SPLIT_SIZE_KEY, 
DEFAULT_MAX_SPLIT_SIZE);
+
+    if (maxSplitSize < blockSize) {
+      log.warn(String.format("Max split size must be at least block size. 
Adjusting to %d.", blockSize));
+      maxSplitSize = blockSize;
+    }
+    if (len < maxSplitSize) {
+      return Lists.newArrayList(workUnit);
+    }
+
+    Collection<WorkUnit> newWorkUnits = Lists.newArrayList();
+
+    long lengthPerSplit = (maxSplitSize / blockSize) * blockSize;
+    int splits = (int) (len / lengthPerSplit + 1);
+
+    for (int i = 0; i < splits; i++) {
+      WorkUnit newWorkUnit = WorkUnit.copyOf(workUnit);
+
+      long lowPos = lengthPerSplit * i;
+      long highPos = Math.min(lengthPerSplit * (i + 1), len);
+
+      Split split = new Split(lowPos, highPos, i, splits,
+          String.format("%s.__PART%d__", file.getDestination().getName(), i));
+      String serializedSplit = GSON.toJson(split);
+
+      newWorkUnit.setProp(SPLIT_KEY, serializedSplit);
+
+      Guid oldGuid = CopySource.getWorkUnitGuid(newWorkUnit).get();
+      Guid newGuid = oldGuid.append(Guid.fromStrings(serializedSplit));
+
+      CopySource.setWorkUnitGuid(workUnit, newGuid);
+      newWorkUnits.add(newWorkUnit);
+    }
+    return newWorkUnits;
+  }
+
+  /**
+   * Finds all split work units in the input collection and merges the file 
parts into the expected output files.
+   * @param fs {@link FileSystem} where file parts exist.
+   * @param workUnits Collection of {@link WorkUnitState}s possibly containing 
split work units.
+   * @return The collection of {@link WorkUnitState}s where split work units 
for each file have been merged.
+   * @throws IOException
+   */
+  public static Collection<WorkUnitState> mergeAllSplitWorkUnits(FileSystem 
fs, Collection<WorkUnitState> workUnits)
+      throws IOException {
+    ListMultimap<CopyableFile, WorkUnitState> splitWorkUnitsMap = 
ArrayListMultimap.create();
+    for (WorkUnitState workUnit : workUnits) {
+      if (isSplitWorkUnit(workUnit)) {
+        CopyableFile copyableFile = (CopyableFile) 
CopySource.deserializeCopyEntity(workUnit);
+        splitWorkUnitsMap.put(copyableFile, workUnit);
+      }
+    }
+
+    for (CopyableFile file : splitWorkUnitsMap.keySet()) {
+      log.info(String.format("Merging split file %s.", file.getDestination()));
+
+      WorkUnitState oldWorkUnit = splitWorkUnitsMap.get(file).get(0);
+      Path outputDir = 
FileAwareInputStreamDataWriter.getOutputDir(oldWorkUnit);
+      CopyEntity.DatasetAndPartition datasetAndPartition =
+          
file.getDatasetAndPartition(CopySource.deserializeCopyableDataset(oldWorkUnit));
+      Path parentPath = FileAwareInputStreamDataWriter.getOutputFilePath(file, 
outputDir, datasetAndPartition)
+          .getParent();
+
+      WorkUnitState newWorkUnit = mergeSplits(fs, file, 
splitWorkUnitsMap.get(file), parentPath);
+
+      for (WorkUnitState wu : splitWorkUnitsMap.get(file)) {
+        // Set to committed so that task states will not fail
+        wu.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+        workUnits.remove(wu);
+      }
+      workUnits.add(newWorkUnit);
+    }
+    return workUnits;
+  }
+
+  /**
+   * Merges all the splits for a given file.
+   * Should be called on the target/destination file system (after blocks have 
been copied to targetFs).
+   * @param fs {@link FileSystem} where file parts exist.
+   * @param file {@link CopyableFile} to merge.
+   * @param workUnits {@link WorkUnitState}s for all parts of this file.
+   * @param parentPath {@link Path} where the parts of the file are located.
+   * @return a {@link WorkUnit} equivalent to the distcp work unit if the file 
had not been split.
+   * @throws IOException
+   */
+  private static WorkUnitState mergeSplits(FileSystem fs, CopyableFile file, 
Collection<WorkUnitState> workUnits,
+      Path parentPath) throws IOException {
+
+    log.info(String.format("File %s was written in %d parts. Merging.", 
file.getDestination(), workUnits.size()));
+    Path[] parts = new Path[workUnits.size()];
+    for (WorkUnitState workUnit : workUnits) {
+      if (!isSplitWorkUnit(workUnit)) {
+        throw new IOException("Not a split work unit.");
+      }
+      Split split = getSplit(workUnit).get();
+      parts[split.getSplitNumber()] = new Path(parentPath, 
split.getPartName());
+    }
+
+    Path target = new Path(parentPath, file.getDestination().getName());
+
+    fs.rename(parts[0], target);
+    fs.concat(target, Arrays.copyOfRange(parts, 1, parts.length));
+
+    WorkUnitState finalWorkUnit = workUnits.iterator().next();
+    finalWorkUnit.removeProp(SPLIT_KEY);
+    return finalWorkUnit;
+  }
+
+  /**
+   * @return whether the {@link WorkUnit} is a split work unit.
+   */
+  public static boolean isSplitWorkUnit(State workUnit) {
+    return workUnit.contains(SPLIT_KEY);
+  }
+
+  /**
+   * @return the {@link Split} object contained in the {@link WorkUnit}.
+   */
+  public static Optional<Split> getSplit(State workUnit) {
+    return workUnit.contains(SPLIT_KEY) ? 
Optional.of(GSON.fromJson(workUnit.getProp(SPLIT_KEY), Split.class))
+        : Optional.<Split>absent();
+  }
+
+  /**
+   * @param state {@link State} containing properties for a job.
+   * @param targetFs destination {@link FileSystem} where file is to be copied
+   * @return whether to allow for splitting of work units based on the 
filesystem, state, converter/writer config.
+   */
+  public static boolean allowSplit(State state, FileSystem targetFs) {
+    // Don't allow distcp jobs that use decrypt/ungzip converters or 
tararchive/encrypt writers to split work units
+    Collection<String> converterClassNames = Collections.emptyList();
+    if (state.contains(ConfigurationKeys.CONVERTER_CLASSES_KEY)) {
+      converterClassNames = 
state.getPropAsList(ConfigurationKeys.CONVERTER_CLASSES_KEY);
+    }
+
+    return state.getPropAsBoolean(SPLIT_ENABLED, false) &&
+        
KNOWN_SCHEMES_SUPPORTING_CONCAT.contains(targetFs.getUri().getScheme()) &&
+        state.getProp(ConfigurationKeys.WRITER_BUILDER_CLASS, "")
+            .equals(FileAwareInputStreamDataWriterBuilder.class.getName()) &&
+        converterClassNames.stream().noneMatch(s -> 
!s.equals(IdentityConverter.class.getName()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index bb9819e..f156949 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
@@ -43,8 +45,6 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.broker.EmptyKey;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -62,8 +62,8 @@ import 
org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.data.management.copy.OwnerAndPermission;
-import org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.util.FileListUtils;
@@ -183,7 +183,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     }
     this.actualProcessedCopyableFile = Optional.of(copyableFile);
     this.fs.mkdirs(stagingFile.getParent());
-    writeImpl(fileAwareInputStream.getInputStream(), stagingFile, 
copyableFile);
+    writeImpl(fileAwareInputStream.getInputStream(), stagingFile, 
copyableFile, fileAwareInputStream);
     this.filesWritten.incrementAndGet();
   }
 
@@ -200,17 +200,31 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
    * @param inputStream {@link FSDataInputStream} whose contents should be 
written to staging path.
    * @param writeAt {@link Path} at which contents should be written.
    * @param copyableFile {@link 
org.apache.gobblin.data.management.copy.CopyEntity} that generated this copy 
operation.
+   * @param record The actual {@link FileAwareInputStream} passed to the write 
method.
    * @throws IOException
    */
-  protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile)
-      throws IOException {
-
-    final short replication =
-        
copyableFile.getPreserve().preserve(PreserveAttributes.Option.REPLICATION) ? 
copyableFile.getOrigin()
-            .getReplication() : this.fs.getDefaultReplication(writeAt);
-    final long blockSize =
-        
copyableFile.getPreserve().preserve(PreserveAttributes.Option.BLOCK_SIZE) ? 
copyableFile.getOrigin()
-            .getBlockSize() : this.fs.getDefaultBlockSize(writeAt);
+  protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile,
+      FileAwareInputStream record) throws IOException {
+
+    final short replication = copyableFile.getReplication(this.fs);
+    final long blockSize = copyableFile.getBlockSize(this.fs);
+    final long fileSize = copyableFile.getFileStatus().getLen();
+
+    long expectedBytes = fileSize;
+    Long maxBytes = null;
+    // Whether writer must write EXACTLY maxBytes.
+    boolean mustMatchMaxBytes = false;
+
+    if (record.getSplit().isPresent()) {
+      maxBytes = record.getSplit().get().getHighPosition() - 
record.getSplit().get().getLowPosition();
+      if (record.getSplit().get().isLastSplit()) {
+        expectedBytes = fileSize % blockSize;
+        mustMatchMaxBytes = false;
+      } else {
+        expectedBytes = maxBytes;
+        mustMatchMaxBytes = true;
+      }
+    }
 
     Predicate<FileStatus> fileStatusAttributesFilter = new 
Predicate<FileStatus>() {
       @Override
@@ -243,7 +257,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         ThrottledInputStream throttledInputStream = 
throttler.throttleInputStream().inputStream(inputStream)
             
.sourceURI(copyableFile.getOrigin().getPath().makeQualified(defaultFS.getUri(), 
defaultFS.getWorkingDirectory()).toUri())
             .targetURI(this.fs.makeQualified(writeAt).toUri()).build();
-        StreamCopier copier = new StreamCopier(throttledInputStream, 
os).withBufferSize(this.bufferSize);
+        StreamCopier copier = new StreamCopier(throttledInputStream, os, 
maxBytes).withBufferSize(this.bufferSize);
 
         log.info("File {}: Starting copy", copyableFile.getOrigin().getPath());
 
@@ -251,10 +265,9 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
           copier.withCopySpeedMeter(this.copySpeedMeter);
         }
         long numBytes = copier.copy();
-        long fileSize = copyableFile.getFileStatus().getLen();
-        if (this.checkFileSize && numBytes != fileSize) {
-          throw new IOException(String.format("Number of bytes copied doesn't 
match filesize for file %s.",
-              copyableFile.getOrigin().getPath()));
+        if ((this.checkFileSize || mustMatchMaxBytes) && numBytes != 
expectedBytes) {
+          throw new IOException(String.format("Incomplete write: expected %d, 
wrote %d bytes.",
+              expectedBytes, numBytes));
         }
         this.bytesWritten.addAndGet(numBytes);
         if (isInstrumentationEnabled()) {
@@ -281,6 +294,9 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   }
 
   protected Path getStagingFilePath(CopyableFile file) {
+    if (DistcpFileSplitter.isSplitWorkUnit(this.state)) {
+      return new Path(this.stagingDir, 
DistcpFileSplitter.getSplit(this.state).get().getPartName());
+    }
     return new Path(this.stagingDir, file.getDestination().getName());
   }
 
@@ -295,6 +311,16 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         
PathUtils.withoutLeadingSeparator(destinationWithoutSchemeAndAuthority));
   }
 
+  public static Path getSplitOutputFilePath(CopyableFile file, Path outputDir,
+      CopyableFile.DatasetAndPartition datasetAndPartition, State workUnit) {
+    if (DistcpFileSplitter.isSplitWorkUnit(workUnit)) {
+      return new Path(getOutputFilePath(file, outputDir, 
datasetAndPartition).getParent(),
+          DistcpFileSplitter.getSplit(workUnit).get().getPartName());
+    } else {
+      return getOutputFilePath(file, outputDir, datasetAndPartition);
+    }
+  }
+
   public static Path getOutputDir(State state) {
     return new Path(
         
state.getProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR,
 1, 0)));
@@ -395,8 +421,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
 
     CopyableFile copyableFile = this.actualProcessedCopyableFile.get();
     Path stagingFilePath = getStagingFilePath(copyableFile);
-    Path outputFilePath = getOutputFilePath(copyableFile, this.outputDir,
-        copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata));
+    Path outputFilePath = getSplitOutputFilePath(copyableFile, this.outputDir,
+        copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata), 
this.state);
 
     log.info(String.format("Committing data from %s to %s", stagingFilePath, 
outputFilePath));
     try {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java
index 5e1164d..4f61d4e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java
@@ -22,7 +22,6 @@ import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.io.StreamCopier;
-import org.apache.gobblin.util.io.StreamUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,7 +60,8 @@ public class TarArchiveInputStreamDataWriter extends 
FileAwareInputStreamDataWri
    * @see 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter#write(org.apache.gobblin.data.management.copy.FileAwareInputStream)
    */
   @Override
-  public void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile) throws IOException {
+  public void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile, FileAwareInputStream record)
+      throws IOException {
     this.closer.register(inputStream);
 
     TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java
index ed4bf39..4e48e9a 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java
@@ -71,8 +71,8 @@ public class DecryptConverterTest {
 
       String gpgFilePath = url.getFile();
       try (FSDataInputStream gpgFileInput = fs.open(new Path(gpgFilePath))) {
-             FileAwareInputStream fileAwareInputStream =
-                 new 
FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(), gpgFileInput);
+             FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder()
+            
.file(CopyableFileUtils.getTestCopyableFile()).inputStream(gpgFileInput).build();
 
              Iterable<FileAwareInputStream> iterable =
                  converter.convertRecord("outputSchema", fileAwareInputStream, 
workUnitState);
@@ -106,8 +106,8 @@ public class DecryptConverterTest {
 
       String testFilePath = url.getFile();
       try (FSDataInputStream testFileInput = fs.open(new Path(testFilePath))) {
-        FileAwareInputStream fileAwareInputStream =
-            new FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(), 
testFileInput);
+        FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder()
+            
.file(CopyableFileUtils.getTestCopyableFile()).inputStream(testFileInput).build();
         fileAwareInputStream.getFile().setDestination(new 
Path("file:///tmp/decrypt-test.txt.insecure_shift"));
         Iterable<FileAwareInputStream> iterable =
             converter.convertRecord("outputSchema", fileAwareInputStream, 
workUnitState);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
index 9f1f8c7..07db779 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
@@ -58,8 +58,8 @@ public class UnGzipConverterTest {
     FileSystem fs = FileSystem.getLocal(new Configuration());
 
     String fullPath = 
getClass().getClassLoader().getResource(filePath).getFile();
-    FileAwareInputStream fileAwareInputStream =
-        new 
FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(filePath), 
fs.open(new Path(fullPath)));
+    FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+        
.file(CopyableFileUtils.getTestCopyableFile(filePath)).inputStream(fs.open(new 
Path(fullPath))).build();
 
     Iterable<FileAwareInputStream> iterable =
         converter.convertRecord("outputSchema", fileAwareInputStream, new 
WorkUnitState());
@@ -80,9 +80,9 @@ public class UnGzipConverterTest {
       String filePath = "unGzipConverterTest/" + fileName;
       String fullPath = 
getClass().getClassLoader().getResource(filePath).getFile();
 
-      FileAwareInputStream fileAwareInputStream =
-          new 
FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + 
fileName, null, null),
-              fs.open(new Path(fullPath)));
+      FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder()
+          .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + 
fileName, null, null))
+          .inputStream(fs.open(new Path(fullPath))).build();
 
       Iterable<FileAwareInputStream> iterable = 
converter.convertRecord("outputSchema", fileAwareInputStream, new 
WorkUnitState());
       FileAwareInputStream out = iterable.iterator().next();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java
new file mode 100644
index 0000000..98959a7
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.copy.splitter;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyableFileUtils;
+import org.apache.gobblin.data.management.copy.TestCopyableDataset;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.guid.Guid;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+
+public class DistcpFileSplitterTest {
+
+  // This test ONLY checks whether manipulates the properties of the work 
units correctly
+  // (i.e. "merging" them within the collection that was passed in).
+  // It does NOT check that the merge in the filesystem has been completed 
successfully.
+  // This requires testing on an HDFS setup.
+  @Test
+  public void testMergeSplitWorkUnits() throws Exception {
+    long mockFileLen = 12L;
+    long mockBlockSize = 4L;
+    long mockMaxSplitSize = 4L;
+    long expectedSplitSize = (mockMaxSplitSize / mockBlockSize) * 
mockBlockSize;
+    int expectedSplits = (int) (mockFileLen / expectedSplitSize + 1);
+
+    FileSystem fs = mock(FileSystem.class);
+
+    List<WorkUnitState> splitWorkUnits =
+        createMockSplitWorkUnits(fs, mockFileLen, mockBlockSize, 
mockMaxSplitSize).stream()
+            .map(wu -> new WorkUnitState(wu)).collect(Collectors.toList());
+    Assert.assertEquals(splitWorkUnits.size(), expectedSplits);
+
+    Collection<WorkUnitState> mergedWorkUnits = 
DistcpFileSplitter.mergeAllSplitWorkUnits(fs, splitWorkUnits);
+    Assert.assertEquals(mergedWorkUnits.size(), 1);
+  }
+
+  // This test checks whether a work unit has been successfully set up for a 
split,
+  // but does not check that the split is actually done correctly when input 
streams are used.
+  @Test
+  public void testSplitFile() throws Exception {
+    long mockFileLen = 12L;
+    long mockBlockSize = 4L;
+    long mockMaxSplitSize = 4L;
+    long expectedSplitSize = (mockMaxSplitSize / mockBlockSize) * 
mockBlockSize;
+    int expectedSplits = (int) (mockFileLen / expectedSplitSize + 1);
+
+    FileSystem fs = mock(FileSystem.class);
+
+    Collection<WorkUnit> splitWorkUnits = createMockSplitWorkUnits(fs, 
mockFileLen, mockBlockSize, mockMaxSplitSize);
+    Assert.assertEquals(splitWorkUnits.size(), expectedSplits);
+
+    Set<Integer> splitNums = new HashSet<>();
+    boolean hasLastSplit = false;
+    for (WorkUnit wu : splitWorkUnits) {
+      Optional<DistcpFileSplitter.Split> split = 
DistcpFileSplitter.getSplit(wu);
+      Assert.assertTrue(split.isPresent());
+      Assert.assertEquals(split.get().getTotalSplits(), expectedSplits);
+      int splitNum = split.get().getSplitNumber();
+      Assert.assertFalse(splitNums.contains(splitNum));
+      splitNums.add(splitNum);
+      Assert.assertEquals(split.get().getLowPosition(), expectedSplitSize * 
splitNum);
+      if (split.get().isLastSplit()) {
+        hasLastSplit = true;
+        Assert.assertEquals(split.get().getHighPosition(), mockFileLen);
+      } else {
+        Assert.assertEquals(split.get().getHighPosition(), expectedSplitSize * 
(splitNum + 1));
+      }
+    }
+    Assert.assertTrue(hasLastSplit);
+  }
+
+  private Collection<WorkUnit> createMockSplitWorkUnits(FileSystem fs, long 
fileLen, long blockSize, long maxSplitSize)
+      throws Exception {
+    FileStatus file = mock(FileStatus.class);
+    when(file.getLen()).thenReturn(fileLen);
+    when(file.getBlockSize()).thenReturn(blockSize);
+
+    URI uri = new URI("hdfs", "dummyhost", "/test", "test");
+    Path path = new Path(uri);
+    when(fs.getUri()).thenReturn(uri);
+
+    CopyableDatasetMetadata cdm = new CopyableDatasetMetadata(new 
TestCopyableDataset(path));
+
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile();
+    CopyableFile spy = spy(cf);
+    doReturn(file).when(spy).getFileStatus();
+    doReturn(blockSize).when(spy).getBlockSize(any(FileSystem.class));
+    doReturn(path).when(spy).getDestination();
+
+    WorkUnit wu = WorkUnit.createEmpty();
+    wu.setProp(DistcpFileSplitter.MAX_SPLIT_SIZE_KEY, maxSplitSize);
+    
wu.setProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR,
 1, 0),
+        path.toString());
+    CopySource.setWorkUnitGuid(wu, Guid.fromStrings(wu.toString()));
+    CopySource.serializeCopyEntity(wu, cf);
+    CopySource.serializeCopyableDataset(wu, cdm);
+
+    return DistcpFileSplitter.splitFile(spy, wu, fs);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index 517077e..c2f62b5 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -26,7 +26,9 @@ import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -92,7 +94,8 @@ public class FileAwareInputStreamDataWriterTest {
 
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
 
-    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(IOUtils.toInputStream(streamString)));
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString))).build();
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
@@ -101,6 +104,45 @@ public class FileAwareInputStreamDataWriterTest {
   }
 
   @Test
+  public void testBlockWrite() throws Exception {
+    String streamString = "testContents";
+
+    FileStatus status = fs.getFileStatus(testTempPath);
+    OwnerAndPermission ownerAndPermission =
+        new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
+    WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
+    state.setProp(DistcpFileSplitter.SPLIT_ENABLED, true);
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+
+    FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+
+    long splitLen = 4;
+    int splits = (int) (streamString.length() / splitLen + 1);
+    DistcpFileSplitter.Split split = new DistcpFileSplitter.Split(0, splitLen, 
0, splits,
+        String.format("%s.__PART%d__", cf.getDestination().getName(), 0));
+    FSDataInputStream dataInputStream = 
StreamUtils.convertStream(IOUtils.toInputStream(streamString));
+    dataInputStream.seek(split.getLowPosition());
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        .inputStream(dataInputStream)
+        .split(Optional.of(split))
+        .build();
+    dataWriter.write(fileAwareInputStream);
+    dataWriter.commit();
+    Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+        cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+    Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString())),
+        streamString.substring(0, (int) splitLen));
+  }
+
+  @Test
   public void testWriteWithEncryption() throws Exception {
     byte[] streamString = "testEncryptedContents".getBytes("UTF-8");
     byte[] expectedContents = new byte[streamString.length];
@@ -126,8 +168,8 @@ public class FileAwareInputStreamDataWriterTest {
 
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
 
-    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(
-        new ByteArrayInputStream(streamString)));
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        .inputStream(StreamUtils.convertStream(new 
ByteArrayInputStream(streamString))).build();
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
 
@@ -161,8 +203,8 @@ public class FileAwareInputStreamDataWriterTest {
 
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
 
-    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(
-        new ByteArrayInputStream(streamString)));
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        .inputStream(StreamUtils.convertStream(new 
ByteArrayInputStream(streamString))).build();
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
 
@@ -217,8 +259,8 @@ public class FileAwareInputStreamDataWriterTest {
 
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
 
-    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(
-        new ByteArrayInputStream(streamString)));
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        .inputStream(StreamUtils.convertStream(new 
ByteArrayInputStream(streamString))).build();
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
index d2406bf..baa1c61 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
@@ -121,7 +121,8 @@ public class TarArchiveInputStreamDataWriterTest {
         CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, 
newFileName).toString(), newFileName,
             ownerAndPermission);
 
-    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
fs.open(new Path(fullPath)));
+    FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
+        .inputStream(fs.open(new Path(fullPath))).build();
 
     Iterable<FileAwareInputStream> iterable =
         converter.convertRecord("outputSchema", fileAwareInputStream, new 
WorkUnitState());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java
index 48917bb..3b25cf3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java
@@ -26,12 +26,12 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import com.codahale.metrics.Meter;
 
 import org.apache.gobblin.util.limiter.Limiter;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 
 /**
  * A class that copies an {@link InputStream} to an {@link OutputStream} in a 
configurable way.
@@ -44,6 +44,8 @@ public class StreamCopier {
 
   private final ReadableByteChannel inputChannel;
   private final WritableByteChannel outputChannel;
+
+  private final Long maxBytes;
   private int bufferSize = DEFAULT_BUFFER_SIZE;
   private Meter copySpeedMeter;
 
@@ -51,12 +53,21 @@ public class StreamCopier {
   private volatile boolean copied = false;
 
   public StreamCopier(InputStream inputStream, OutputStream outputStream) {
-    this(Channels.newChannel(inputStream), Channels.newChannel(outputStream));
+    this(inputStream, outputStream, null);
+  }
+
+  public StreamCopier(InputStream inputStream, OutputStream outputStream, Long 
maxBytes) {
+    this(Channels.newChannel(inputStream), Channels.newChannel(outputStream), 
maxBytes);
   }
 
   public StreamCopier(ReadableByteChannel inputChannel, WritableByteChannel 
outputChannel) {
+    this(inputChannel, outputChannel, null);
+  }
+
+  public StreamCopier(ReadableByteChannel inputChannel, WritableByteChannel 
outputChannel, Long maxBytes) {
     this.inputChannel = inputChannel;
     this.outputChannel = outputChannel;
+    this.maxBytes = maxBytes;
   }
 
   /**
@@ -84,7 +95,8 @@ public class StreamCopier {
   }
 
   /**
-   * Execute the copy of bytes from the input to the output stream.
+   * Execute the copy of bytes from the input to the output stream. If 
maxBytes is specified, limits the number of
+   * bytes copied to maxBytes.
    * Note: this method should only be called once. Further calls will throw a 
{@link IllegalStateException}.
    * @return Number of bytes copied.
    */
@@ -96,19 +108,28 @@ public class StreamCopier {
     this.copied = true;
 
     try {
-      long bytesRead = 0;
-      long totalBytesRead = 0;
+      long numBytes = 0;
+      long totalBytes = 0;
 
       final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize);
-      while ((bytesRead = fillBufferFromInputChannel(buffer)) != -1) {
-        totalBytesRead += bytesRead;
+      // Only keep copying if we've read less than maxBytes (if maxBytes 
exists)
+      while ((this.maxBytes == null || this.maxBytes > totalBytes) &&
+          (numBytes = fillBufferFromInputChannel(buffer)) != -1) {
+        totalBytes += numBytes;
         // flip the buffer to be written
         buffer.flip();
+
+        // If we've read more than maxBytes, discard enough bytes to only 
write maxBytes.
+        if (this.maxBytes != null && totalBytes > this.maxBytes) {
+          buffer.limit(buffer.limit() - (int) (totalBytes - this.maxBytes));
+          totalBytes = this.maxBytes;
+        }
+
         this.outputChannel.write(buffer);
         // Clear if empty
         buffer.compact();
         if (this.copySpeedMeter != null) {
-          this.copySpeedMeter.mark(bytesRead);
+          this.copySpeedMeter.mark(numBytes);
         }
       }
       // Done writing, now flip to read again
@@ -118,7 +139,7 @@ public class StreamCopier {
         this.outputChannel.write(buffer);
       }
 
-      return totalBytesRead;
+      return totalBytes;
     } finally {
       if (this.closeChannelsOnComplete) {
         this.inputChannel.close();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java
index afde7dd..d2a8357 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java
@@ -27,8 +27,6 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Charsets;
 
-import org.apache.gobblin.util.limiter.CountBasedLimiter;
-
 
 public class StreamCopierTest {
 
@@ -59,6 +57,19 @@ public class StreamCopierTest {
   }
 
   @Test
+  public void testBlockCopy() throws Exception {
+    String testString = "This is a string";
+    ByteArrayInputStream inputStream = new 
ByteArrayInputStream(testString.getBytes(Charsets.UTF_8));
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    long maxBytes = 4L;
+    new StreamCopier(inputStream, outputStream, maxBytes).copy();
+
+    Assert.assertEquals(new String(outputStream.toByteArray(), Charsets.UTF_8),
+        testString.substring(0, (int) maxBytes));
+  }
+
+  @Test
   public void testCopyMeter() throws Exception {
     String testString = "This is a string";
     Meter meter = new MetricRegistry().meter("my.meter");

Reply via email to