Repository: incubator-gobblin Updated Branches: refs/heads/master 6e848e7d3 -> cba369929
[GOBBLIN-598] Add DistcpFileSplitter to allow for block level distcp [GOBBLIN-598] Add DistcpFileSplitter to allow for block level distcp Fix task state bug for merging in block distcp Address review comments, and add javadoc/comments AllowSplit and ADL modifications Closes #2461 from cshen98/distcp1 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cba36992 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cba36992 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cba36992 Branch: refs/heads/master Commit: cba3699299c7a4013fc4c193caebe87c1cac5b76 Parents: 6e848e7 Author: Carl Shen <[email protected]> Authored: Mon Oct 22 08:39:46 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 22 08:39:46 2018 -0700 ---------------------------------------------------------------------- .../data/management/copy/CopySource.java | 10 +- .../data/management/copy/CopyableFile.java | 16 ++ .../management/copy/FileAwareInputStream.java | 21 +- .../copy/converter/DistcpConverter.java | 2 +- .../FileAwareInputStreamExtractor.java | 31 ++- .../copy/publisher/CopyDataPublisher.java | 14 +- .../copy/splitter/DistcpFileSplitter.java | 243 +++++++++++++++++++ .../writer/FileAwareInputStreamDataWriter.java | 66 +++-- .../writer/TarArchiveInputStreamDataWriter.java | 5 +- .../copy/converter/DecryptConverterTest.java | 8 +- .../copy/converter/UnGzipConverterTest.java | 10 +- .../copy/splitter/DistcpFileSplitterTest.java | 142 +++++++++++ .../FileAwareInputStreamDataWriterTest.java | 56 ++++- .../TarArchiveInputStreamDataWriterTest.java | 3 +- .../apache/gobblin/util/io/StreamCopier.java | 41 +++- .../gobblin/util/io/StreamCopierTest.java | 15 +- 16 files changed, 612 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 9ae9b45..a2d9dc2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -55,6 +55,7 @@ import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor; import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor; import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator; import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper; +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator; import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper; import org.apache.gobblin.data.management.dataset.DatasetUtils; @@ -217,7 +218,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { try { return GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor( new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias), - input.getDataset(), input, state, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo); + input.getDataset(), input, state, targetFs, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo); } catch (Exception e) { throw new RuntimeException("Cannot create workunits generator", e); } @@ -335,6 +336,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { protected final CopyableDatasetBase copyableDataset; protected final FileSet<CopyEntity> fileSet; protected final State state; + protected final FileSystem targetFs; protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList; protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator; protected final long minWorkUnitWeight; @@ -365,8 +367,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { setWorkUnitWeight(workUnit, copyEntity, minWorkUnitWeight); setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity); computeAndSetWorkUnitGuid(workUnit); - workUnitsForPartition.add(workUnit); addLineageInfo(copyEntity, workUnit); + if (copyEntity instanceof CopyableFile && DistcpFileSplitter.allowSplit(this.state, this.targetFs)) { + workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs)); + } else { + workUnitsForPartition.add(workUnit); + } } this.workUnitList.putAll(this.fileSet, workUnitsForPartition); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 9ad918c..6c1093f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -352,6 +352,22 @@ public class CopyableFile extends CopyEntity implements File { } /** + * @return desired block size for destination file. + */ + public long getBlockSize(FileSystem targetFs) { + return getPreserve().preserve(PreserveAttributes.Option.BLOCK_SIZE) ? + getOrigin().getBlockSize() : targetFs.getDefaultBlockSize(this.destination); + } + + /** + * @return desired replication for destination file. + */ + public short getReplication(FileSystem targetFs) { + return getPreserve().preserve(PreserveAttributes.Option.REPLICATION) ? + getOrigin().getReplication() : targetFs.getDefaultReplication(this.destination); + } + + /** * Generates a replicable guid to uniquely identify the origin of this {@link CopyableFile}. * @return a guid uniquely identifying the origin file. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java index b399ec1..8d43c36 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/FileAwareInputStream.java @@ -19,19 +19,36 @@ package org.apache.gobblin.data.management.copy; import java.io.InputStream; -import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NonNull; import lombok.Getter; +import com.google.common.base.Optional; + +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; + + /** * A wrapper to {@link InputStream} that represents an entity to be copied. The enclosed {@link CopyableFile} instance * contains file Metadata like permission, destination path etc. required by the writers and converters. + * The enclosed {@link DistcpFileSplitter.Split} object indicates whether the {@link InputStream} to be copied is a + * block of the {@link CopyableFile} or not. If it is present, the {@link InputStream} should already be at the start + * position of the specified split/block. */ -@AllArgsConstructor @Getter public class FileAwareInputStream { private CopyableFile file; private InputStream inputStream; + private Optional<DistcpFileSplitter.Split> split = Optional.absent(); + + @Builder(toBuilder = true) + public FileAwareInputStream(@NonNull CopyableFile file, @NonNull InputStream inputStream, + Optional<DistcpFileSplitter.Split> split) { + this.file = file; + this.inputStream = inputStream; + this.split = split == null ? Optional.<DistcpFileSplitter.Split>absent() : split; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java index ee6f221..f5f122e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DistcpConverter.java @@ -85,7 +85,7 @@ public abstract class DistcpConverter extends Converter<String, String, FileAwar modifyExtensionAtDestination(fileAwareInputStream.getFile()); try { InputStream newInputStream = inputStreamTransformation().apply(fileAwareInputStream.getInputStream()); - return new SingleRecordIterable<>(new FileAwareInputStream(fileAwareInputStream.getFile(), newInputStream)); + return new SingleRecordIterable<>(fileAwareInputStream.toBuilder().inputStream(newInputStream).build()); } catch (RuntimeException re) { throw new DataConversionException(re); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java index 4b212bf..9863a98 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java @@ -17,21 +17,24 @@ package org.apache.gobblin.data.management.copy.extractor; +import com.google.common.base.Optional; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; + import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; import org.apache.gobblin.source.extractor.DataRecordException; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.io.EmptyInputStream; import org.apache.gobblin.util.io.MeteredInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - /** * An implementation of {@link Extractor} that extracts {@link InputStream}s. This extractor is suitable for copy jobs @@ -81,11 +84,21 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state); FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf); this.recordRead = true; + FileAwareInputStream.FileAwareInputStreamBuilder builder = FileAwareInputStream.builder().file(this.file); if (this.file.getFileStatus().isDirectory()) { - return new FileAwareInputStream(this.file, EmptyInputStream.instance); + return builder.inputStream(EmptyInputStream.instance).build(); + } + + FSDataInputStream dataInputStream = fsFromFile.open(this.file.getFileStatus().getPath()); + if (this.state != null && DistcpFileSplitter.isSplitWorkUnit(this.state)) { + Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(this.state); + builder.split(split); + if (split.isPresent()) { + dataInputStream.seek(split.get().getLowPosition()); + } } - return new FileAwareInputStream(this.file, - MeteredInputStream.builder().in(fsFromFile.open(this.file.getFileStatus().getPath())).build()); + builder.inputStream(MeteredInputStream.builder().in(dataInputStream).build()); + return builder.build(); } return null; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index ec7b1a0..cfd715a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -17,10 +17,6 @@ package org.apache.gobblin.data.management.copy.publisher; - -import org.apache.gobblin.configuration.SourceState; -import org.apache.gobblin.metrics.event.lineage.LineageInfo; -import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import java.io.IOException; import java.net.URI; import java.util.Collection; @@ -29,6 +25,8 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,6 +39,7 @@ import com.google.common.collect.Multimap; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.configuration.WorkUnitState.WorkingState; @@ -53,18 +52,20 @@ import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper; +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter; import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.WriterUtils; -import lombok.extern.slf4j.Slf4j; /** * A {@link DataPublisher} to {@link org.apache.gobblin.data.management.copy.CopyEntity}s from task output to final destination. @@ -185,6 +186,9 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl .deserialize(datasetWorkUnitStates.iterator().next().getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); Path datasetWriterOutputPath = new Path(this.writerOutputDir, datasetAndPartition.identifier()); + log.info("Merging all split work units."); + DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates); + log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", datasetAndPartition.identifier(), datasetWriterOutputPath, metadata.getDatasetURN())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java new file mode 100644 index 0000000..688bb6b --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitter.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.splitter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.math3.util.ArithmeticUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Optional; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.IdentityConverter; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopySource; +import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter; +import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.guid.Guid; + + +/** + * Helper class for splitting files for distcp. The property flag gobblin.copy.split.enabled should be used to enable + * splitting of files (which is disabled by default). Splitting should only be used if the distcp job uses only the + * IdentityConverter and should not be used for distcp jobs that require decryption/ungzipping. + */ +@Slf4j +public class DistcpFileSplitter { + + public static final String SPLIT_ENABLED = CopyConfiguration.COPY_PREFIX + ".split.enabled"; + public static final String MAX_SPLIT_SIZE_KEY = CopyConfiguration.COPY_PREFIX + ".file.max.split.size"; + + public static final long DEFAULT_MAX_SPLIT_SIZE = Long.MAX_VALUE; + public static final Set<String> KNOWN_SCHEMES_SUPPORTING_CONCAT = Sets.newHashSet("hdfs", "adl"); + + /** + * A split for a distcp file. Represents a section of a file; split should be aligned to block boundaries. + */ + @Data + public static class Split { + private final long lowPosition; + private final long highPosition; + private final int splitNumber; + private final int totalSplits; + private final String partName; + + public final boolean isLastSplit() { + return this.splitNumber == this.totalSplits - 1; + } + } + + private static final String SPLIT_KEY = CopyConfiguration.COPY_PREFIX + ".file.splitter.split"; + private static final Gson GSON = new Gson(); + + /** + * Split an input {@link CopyableFile} into multiple splits aligned with block boundaries. + * + * @param file {@link CopyableFile} to split. + * @param workUnit {@link WorkUnit} generated for this file. + * @param targetFs destination {@link FileSystem} where file is to be copied. + * @return a list of {@link WorkUnit}, each for a split of this file. + * @throws IOException + */ + public static Collection<WorkUnit> splitFile(CopyableFile file, WorkUnit workUnit, FileSystem targetFs) + throws IOException { + long len = file.getFileStatus().getLen(); + // get lcm of source and target block size so that split aligns with block boundaries for both extract and write + long blockSize = ArithmeticUtils.lcm(file.getFileStatus().getBlockSize(), file.getBlockSize(targetFs)); + long maxSplitSize = workUnit.getPropAsLong(MAX_SPLIT_SIZE_KEY, DEFAULT_MAX_SPLIT_SIZE); + + if (maxSplitSize < blockSize) { + log.warn(String.format("Max split size must be at least block size. Adjusting to %d.", blockSize)); + maxSplitSize = blockSize; + } + if (len < maxSplitSize) { + return Lists.newArrayList(workUnit); + } + + Collection<WorkUnit> newWorkUnits = Lists.newArrayList(); + + long lengthPerSplit = (maxSplitSize / blockSize) * blockSize; + int splits = (int) (len / lengthPerSplit + 1); + + for (int i = 0; i < splits; i++) { + WorkUnit newWorkUnit = WorkUnit.copyOf(workUnit); + + long lowPos = lengthPerSplit * i; + long highPos = Math.min(lengthPerSplit * (i + 1), len); + + Split split = new Split(lowPos, highPos, i, splits, + String.format("%s.__PART%d__", file.getDestination().getName(), i)); + String serializedSplit = GSON.toJson(split); + + newWorkUnit.setProp(SPLIT_KEY, serializedSplit); + + Guid oldGuid = CopySource.getWorkUnitGuid(newWorkUnit).get(); + Guid newGuid = oldGuid.append(Guid.fromStrings(serializedSplit)); + + CopySource.setWorkUnitGuid(workUnit, newGuid); + newWorkUnits.add(newWorkUnit); + } + return newWorkUnits; + } + + /** + * Finds all split work units in the input collection and merges the file parts into the expected output files. + * @param fs {@link FileSystem} where file parts exist. + * @param workUnits Collection of {@link WorkUnitState}s possibly containing split work units. + * @return The collection of {@link WorkUnitState}s where split work units for each file have been merged. + * @throws IOException + */ + public static Collection<WorkUnitState> mergeAllSplitWorkUnits(FileSystem fs, Collection<WorkUnitState> workUnits) + throws IOException { + ListMultimap<CopyableFile, WorkUnitState> splitWorkUnitsMap = ArrayListMultimap.create(); + for (WorkUnitState workUnit : workUnits) { + if (isSplitWorkUnit(workUnit)) { + CopyableFile copyableFile = (CopyableFile) CopySource.deserializeCopyEntity(workUnit); + splitWorkUnitsMap.put(copyableFile, workUnit); + } + } + + for (CopyableFile file : splitWorkUnitsMap.keySet()) { + log.info(String.format("Merging split file %s.", file.getDestination())); + + WorkUnitState oldWorkUnit = splitWorkUnitsMap.get(file).get(0); + Path outputDir = FileAwareInputStreamDataWriter.getOutputDir(oldWorkUnit); + CopyEntity.DatasetAndPartition datasetAndPartition = + file.getDatasetAndPartition(CopySource.deserializeCopyableDataset(oldWorkUnit)); + Path parentPath = FileAwareInputStreamDataWriter.getOutputFilePath(file, outputDir, datasetAndPartition) + .getParent(); + + WorkUnitState newWorkUnit = mergeSplits(fs, file, splitWorkUnitsMap.get(file), parentPath); + + for (WorkUnitState wu : splitWorkUnitsMap.get(file)) { + // Set to committed so that task states will not fail + wu.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + workUnits.remove(wu); + } + workUnits.add(newWorkUnit); + } + return workUnits; + } + + /** + * Merges all the splits for a given file. + * Should be called on the target/destination file system (after blocks have been copied to targetFs). + * @param fs {@link FileSystem} where file parts exist. + * @param file {@link CopyableFile} to merge. + * @param workUnits {@link WorkUnitState}s for all parts of this file. + * @param parentPath {@link Path} where the parts of the file are located. + * @return a {@link WorkUnit} equivalent to the distcp work unit if the file had not been split. + * @throws IOException + */ + private static WorkUnitState mergeSplits(FileSystem fs, CopyableFile file, Collection<WorkUnitState> workUnits, + Path parentPath) throws IOException { + + log.info(String.format("File %s was written in %d parts. Merging.", file.getDestination(), workUnits.size())); + Path[] parts = new Path[workUnits.size()]; + for (WorkUnitState workUnit : workUnits) { + if (!isSplitWorkUnit(workUnit)) { + throw new IOException("Not a split work unit."); + } + Split split = getSplit(workUnit).get(); + parts[split.getSplitNumber()] = new Path(parentPath, split.getPartName()); + } + + Path target = new Path(parentPath, file.getDestination().getName()); + + fs.rename(parts[0], target); + fs.concat(target, Arrays.copyOfRange(parts, 1, parts.length)); + + WorkUnitState finalWorkUnit = workUnits.iterator().next(); + finalWorkUnit.removeProp(SPLIT_KEY); + return finalWorkUnit; + } + + /** + * @return whether the {@link WorkUnit} is a split work unit. + */ + public static boolean isSplitWorkUnit(State workUnit) { + return workUnit.contains(SPLIT_KEY); + } + + /** + * @return the {@link Split} object contained in the {@link WorkUnit}. + */ + public static Optional<Split> getSplit(State workUnit) { + return workUnit.contains(SPLIT_KEY) ? Optional.of(GSON.fromJson(workUnit.getProp(SPLIT_KEY), Split.class)) + : Optional.<Split>absent(); + } + + /** + * @param state {@link State} containing properties for a job. + * @param targetFs destination {@link FileSystem} where file is to be copied + * @return whether to allow for splitting of work units based on the filesystem, state, converter/writer config. + */ + public static boolean allowSplit(State state, FileSystem targetFs) { + // Don't allow distcp jobs that use decrypt/ungzip converters or tararchive/encrypt writers to split work units + Collection<String> converterClassNames = Collections.emptyList(); + if (state.contains(ConfigurationKeys.CONVERTER_CLASSES_KEY)) { + converterClassNames = state.getPropAsList(ConfigurationKeys.CONVERTER_CLASSES_KEY); + } + + return state.getPropAsBoolean(SPLIT_ENABLED, false) && + KNOWN_SCHEMES_SUPPORTING_CONCAT.contains(targetFs.getUri().getScheme()) && + state.getProp(ConfigurationKeys.WRITER_BUILDER_CLASS, "") + .equals(FileAwareInputStreamDataWriterBuilder.class.getName()) && + converterClassNames.stream().noneMatch(s -> !s.equals(IdentityConverter.class.getName())); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java index bb9819e..f156949 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; @@ -43,8 +45,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Iterators; -import lombok.extern.slf4j.Slf4j; - import org.apache.gobblin.broker.EmptyKey; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.NotConfiguredException; @@ -62,8 +62,8 @@ import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.FileAwareInputStream; import org.apache.gobblin.data.management.copy.OwnerAndPermission; -import org.apache.gobblin.data.management.copy.PreserveAttributes; import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper; +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter; import org.apache.gobblin.state.ConstructState; import org.apache.gobblin.util.FileListUtils; @@ -183,7 +183,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA } this.actualProcessedCopyableFile = Optional.of(copyableFile); this.fs.mkdirs(stagingFile.getParent()); - writeImpl(fileAwareInputStream.getInputStream(), stagingFile, copyableFile); + writeImpl(fileAwareInputStream.getInputStream(), stagingFile, copyableFile, fileAwareInputStream); this.filesWritten.incrementAndGet(); } @@ -200,17 +200,31 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA * @param inputStream {@link FSDataInputStream} whose contents should be written to staging path. * @param writeAt {@link Path} at which contents should be written. * @param copyableFile {@link org.apache.gobblin.data.management.copy.CopyEntity} that generated this copy operation. + * @param record The actual {@link FileAwareInputStream} passed to the write method. * @throws IOException */ - protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile) - throws IOException { - - final short replication = - copyableFile.getPreserve().preserve(PreserveAttributes.Option.REPLICATION) ? copyableFile.getOrigin() - .getReplication() : this.fs.getDefaultReplication(writeAt); - final long blockSize = - copyableFile.getPreserve().preserve(PreserveAttributes.Option.BLOCK_SIZE) ? copyableFile.getOrigin() - .getBlockSize() : this.fs.getDefaultBlockSize(writeAt); + protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile, + FileAwareInputStream record) throws IOException { + + final short replication = copyableFile.getReplication(this.fs); + final long blockSize = copyableFile.getBlockSize(this.fs); + final long fileSize = copyableFile.getFileStatus().getLen(); + + long expectedBytes = fileSize; + Long maxBytes = null; + // Whether writer must write EXACTLY maxBytes. + boolean mustMatchMaxBytes = false; + + if (record.getSplit().isPresent()) { + maxBytes = record.getSplit().get().getHighPosition() - record.getSplit().get().getLowPosition(); + if (record.getSplit().get().isLastSplit()) { + expectedBytes = fileSize % blockSize; + mustMatchMaxBytes = false; + } else { + expectedBytes = maxBytes; + mustMatchMaxBytes = true; + } + } Predicate<FileStatus> fileStatusAttributesFilter = new Predicate<FileStatus>() { @Override @@ -243,7 +257,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA ThrottledInputStream throttledInputStream = throttler.throttleInputStream().inputStream(inputStream) .sourceURI(copyableFile.getOrigin().getPath().makeQualified(defaultFS.getUri(), defaultFS.getWorkingDirectory()).toUri()) .targetURI(this.fs.makeQualified(writeAt).toUri()).build(); - StreamCopier copier = new StreamCopier(throttledInputStream, os).withBufferSize(this.bufferSize); + StreamCopier copier = new StreamCopier(throttledInputStream, os, maxBytes).withBufferSize(this.bufferSize); log.info("File {}: Starting copy", copyableFile.getOrigin().getPath()); @@ -251,10 +265,9 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA copier.withCopySpeedMeter(this.copySpeedMeter); } long numBytes = copier.copy(); - long fileSize = copyableFile.getFileStatus().getLen(); - if (this.checkFileSize && numBytes != fileSize) { - throw new IOException(String.format("Number of bytes copied doesn't match filesize for file %s.", - copyableFile.getOrigin().getPath())); + if ((this.checkFileSize || mustMatchMaxBytes) && numBytes != expectedBytes) { + throw new IOException(String.format("Incomplete write: expected %d, wrote %d bytes.", + expectedBytes, numBytes)); } this.bytesWritten.addAndGet(numBytes); if (isInstrumentationEnabled()) { @@ -281,6 +294,9 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA } protected Path getStagingFilePath(CopyableFile file) { + if (DistcpFileSplitter.isSplitWorkUnit(this.state)) { + return new Path(this.stagingDir, DistcpFileSplitter.getSplit(this.state).get().getPartName()); + } return new Path(this.stagingDir, file.getDestination().getName()); } @@ -295,6 +311,16 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA PathUtils.withoutLeadingSeparator(destinationWithoutSchemeAndAuthority)); } + public static Path getSplitOutputFilePath(CopyableFile file, Path outputDir, + CopyableFile.DatasetAndPartition datasetAndPartition, State workUnit) { + if (DistcpFileSplitter.isSplitWorkUnit(workUnit)) { + return new Path(getOutputFilePath(file, outputDir, datasetAndPartition).getParent(), + DistcpFileSplitter.getSplit(workUnit).get().getPartName()); + } else { + return getOutputFilePath(file, outputDir, datasetAndPartition); + } + } + public static Path getOutputDir(State state) { return new Path( state.getProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR, 1, 0))); @@ -395,8 +421,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA CopyableFile copyableFile = this.actualProcessedCopyableFile.get(); Path stagingFilePath = getStagingFilePath(copyableFile); - Path outputFilePath = getOutputFilePath(copyableFile, this.outputDir, - copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata)); + Path outputFilePath = getSplitOutputFilePath(copyableFile, this.outputDir, + copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata), this.state); log.info(String.format("Committing data from %s to %s", stagingFilePath, outputFilePath)); try { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java index 5e1164d..4f61d4e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriter.java @@ -22,7 +22,6 @@ import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.FileAwareInputStream; import org.apache.gobblin.util.FileUtils; import org.apache.gobblin.util.io.StreamCopier; -import org.apache.gobblin.util.io.StreamUtils; import java.io.IOException; import java.io.InputStream; @@ -36,7 +35,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,7 +60,8 @@ public class TarArchiveInputStreamDataWriter extends FileAwareInputStreamDataWri * @see org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter#write(org.apache.gobblin.data.management.copy.FileAwareInputStream) */ @Override - public void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile) throws IOException { + public void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile, FileAwareInputStream record) + throws IOException { this.closer.register(inputStream); TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java index ed4bf39..4e48e9a 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/DecryptConverterTest.java @@ -71,8 +71,8 @@ public class DecryptConverterTest { String gpgFilePath = url.getFile(); try (FSDataInputStream gpgFileInput = fs.open(new Path(gpgFilePath))) { - FileAwareInputStream fileAwareInputStream = - new FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(), gpgFileInput); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder() + .file(CopyableFileUtils.getTestCopyableFile()).inputStream(gpgFileInput).build(); Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, workUnitState); @@ -106,8 +106,8 @@ public class DecryptConverterTest { String testFilePath = url.getFile(); try (FSDataInputStream testFileInput = fs.open(new Path(testFilePath))) { - FileAwareInputStream fileAwareInputStream = - new FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(), testFileInput); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder() + .file(CopyableFileUtils.getTestCopyableFile()).inputStream(testFileInput).build(); fileAwareInputStream.getFile().setDestination(new Path("file:///tmp/decrypt-test.txt.insecure_shift")); Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, workUnitState); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java index 9f1f8c7..07db779 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java @@ -58,8 +58,8 @@ public class UnGzipConverterTest { FileSystem fs = FileSystem.getLocal(new Configuration()); String fullPath = getClass().getClassLoader().getResource(filePath).getFile(); - FileAwareInputStream fileAwareInputStream = - new FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(filePath), fs.open(new Path(fullPath))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder() + .file(CopyableFileUtils.getTestCopyableFile(filePath)).inputStream(fs.open(new Path(fullPath))).build(); Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, new WorkUnitState()); @@ -80,9 +80,9 @@ public class UnGzipConverterTest { String filePath = "unGzipConverterTest/" + fileName; String fullPath = getClass().getClassLoader().getResource(filePath).getFile(); - FileAwareInputStream fileAwareInputStream = - new FileAwareInputStream(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, null), - fs.open(new Path(fullPath))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder() + .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, null)) + .inputStream(fs.open(new Path(fullPath))).build(); Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, new WorkUnitState()); FileAwareInputStream out = iterable.iterator().next(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java new file mode 100644 index 0000000..98959a7 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/splitter/DistcpFileSplitterTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.data.management.copy.splitter; + +import java.net.URI; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.CopySource; +import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.CopyableFileUtils; +import org.apache.gobblin.data.management.copy.TestCopyableDataset; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.ForkOperatorUtils; +import org.apache.gobblin.util.guid.Guid; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + + +public class DistcpFileSplitterTest { + + // This test ONLY checks whether manipulates the properties of the work units correctly + // (i.e. "merging" them within the collection that was passed in). + // It does NOT check that the merge in the filesystem has been completed successfully. + // This requires testing on an HDFS setup. + @Test + public void testMergeSplitWorkUnits() throws Exception { + long mockFileLen = 12L; + long mockBlockSize = 4L; + long mockMaxSplitSize = 4L; + long expectedSplitSize = (mockMaxSplitSize / mockBlockSize) * mockBlockSize; + int expectedSplits = (int) (mockFileLen / expectedSplitSize + 1); + + FileSystem fs = mock(FileSystem.class); + + List<WorkUnitState> splitWorkUnits = + createMockSplitWorkUnits(fs, mockFileLen, mockBlockSize, mockMaxSplitSize).stream() + .map(wu -> new WorkUnitState(wu)).collect(Collectors.toList()); + Assert.assertEquals(splitWorkUnits.size(), expectedSplits); + + Collection<WorkUnitState> mergedWorkUnits = DistcpFileSplitter.mergeAllSplitWorkUnits(fs, splitWorkUnits); + Assert.assertEquals(mergedWorkUnits.size(), 1); + } + + // This test checks whether a work unit has been successfully set up for a split, + // but does not check that the split is actually done correctly when input streams are used. + @Test + public void testSplitFile() throws Exception { + long mockFileLen = 12L; + long mockBlockSize = 4L; + long mockMaxSplitSize = 4L; + long expectedSplitSize = (mockMaxSplitSize / mockBlockSize) * mockBlockSize; + int expectedSplits = (int) (mockFileLen / expectedSplitSize + 1); + + FileSystem fs = mock(FileSystem.class); + + Collection<WorkUnit> splitWorkUnits = createMockSplitWorkUnits(fs, mockFileLen, mockBlockSize, mockMaxSplitSize); + Assert.assertEquals(splitWorkUnits.size(), expectedSplits); + + Set<Integer> splitNums = new HashSet<>(); + boolean hasLastSplit = false; + for (WorkUnit wu : splitWorkUnits) { + Optional<DistcpFileSplitter.Split> split = DistcpFileSplitter.getSplit(wu); + Assert.assertTrue(split.isPresent()); + Assert.assertEquals(split.get().getTotalSplits(), expectedSplits); + int splitNum = split.get().getSplitNumber(); + Assert.assertFalse(splitNums.contains(splitNum)); + splitNums.add(splitNum); + Assert.assertEquals(split.get().getLowPosition(), expectedSplitSize * splitNum); + if (split.get().isLastSplit()) { + hasLastSplit = true; + Assert.assertEquals(split.get().getHighPosition(), mockFileLen); + } else { + Assert.assertEquals(split.get().getHighPosition(), expectedSplitSize * (splitNum + 1)); + } + } + Assert.assertTrue(hasLastSplit); + } + + private Collection<WorkUnit> createMockSplitWorkUnits(FileSystem fs, long fileLen, long blockSize, long maxSplitSize) + throws Exception { + FileStatus file = mock(FileStatus.class); + when(file.getLen()).thenReturn(fileLen); + when(file.getBlockSize()).thenReturn(blockSize); + + URI uri = new URI("hdfs", "dummyhost", "/test", "test"); + Path path = new Path(uri); + when(fs.getUri()).thenReturn(uri); + + CopyableDatasetMetadata cdm = new CopyableDatasetMetadata(new TestCopyableDataset(path)); + + CopyableFile cf = CopyableFileUtils.getTestCopyableFile(); + CopyableFile spy = spy(cf); + doReturn(file).when(spy).getFileStatus(); + doReturn(blockSize).when(spy).getBlockSize(any(FileSystem.class)); + doReturn(path).when(spy).getDestination(); + + WorkUnit wu = WorkUnit.createEmpty(); + wu.setProp(DistcpFileSplitter.MAX_SPLIT_SIZE_KEY, maxSplitSize); + wu.setProp(ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR, 1, 0), + path.toString()); + CopySource.setWorkUnitGuid(wu, Guid.fromStrings(wu.toString())); + CopySource.serializeCopyEntity(wu, cf); + CopySource.serializeCopyableDataset(wu, cdm); + + return DistcpFileSplitter.splitFile(spy, wu, fs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java index 517077e..c2f62b5 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java @@ -26,7 +26,9 @@ import java.util.Properties; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.RandomStringUtils; +import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -92,7 +94,8 @@ public class FileAwareInputStreamDataWriterTest { FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); - FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream(IOUtils.toInputStream(streamString))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString))).build(); dataWriter.write(fileAwareInputStream); dataWriter.commit(); Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), @@ -101,6 +104,45 @@ public class FileAwareInputStreamDataWriterTest { } @Test + public void testBlockWrite() throws Exception { + String streamString = "testContents"; + + FileStatus status = fs.getFileStatus(testTempPath); + OwnerAndPermission ownerAndPermission = + new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission); + + CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source"))); + + WorkUnitState state = TestUtils.createTestWorkUnitState(); + state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString()); + state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); + state.setProp(DistcpFileSplitter.SPLIT_ENABLED, true); + CopySource.serializeCopyEntity(state, cf); + CopySource.serializeCopyableDataset(state, metadata); + + FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); + + long splitLen = 4; + int splits = (int) (streamString.length() / splitLen + 1); + DistcpFileSplitter.Split split = new DistcpFileSplitter.Split(0, splitLen, 0, splits, + String.format("%s.__PART%d__", cf.getDestination().getName(), 0)); + FSDataInputStream dataInputStream = StreamUtils.convertStream(IOUtils.toInputStream(streamString)); + dataInputStream.seek(split.getLowPosition()); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(dataInputStream) + .split(Optional.of(split)) + .build(); + dataWriter.write(fileAwareInputStream); + dataWriter.commit(); + Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), + cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), + streamString.substring(0, (int) splitLen)); + } + + @Test public void testWriteWithEncryption() throws Exception { byte[] streamString = "testEncryptedContents".getBytes("UTF-8"); byte[] expectedContents = new byte[streamString.length]; @@ -126,8 +168,8 @@ public class FileAwareInputStreamDataWriterTest { FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); - FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream( - new ByteArrayInputStream(streamString))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(StreamUtils.convertStream(new ByteArrayInputStream(streamString))).build(); dataWriter.write(fileAwareInputStream); dataWriter.commit(); @@ -161,8 +203,8 @@ public class FileAwareInputStreamDataWriterTest { FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); - FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream( - new ByteArrayInputStream(streamString))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(StreamUtils.convertStream(new ByteArrayInputStream(streamString))).build(); dataWriter.write(fileAwareInputStream); dataWriter.commit(); @@ -217,8 +259,8 @@ public class FileAwareInputStreamDataWriterTest { FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); - FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream( - new ByteArrayInputStream(streamString))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(StreamUtils.convertStream(new ByteArrayInputStream(streamString))).build(); dataWriter.write(fileAwareInputStream); dataWriter.commit(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java index d2406bf..baa1c61 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java @@ -121,7 +121,8 @@ public class TarArchiveInputStreamDataWriterTest { CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, newFileName).toString(), newFileName, ownerAndPermission); - FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, fs.open(new Path(fullPath))); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) + .inputStream(fs.open(new Path(fullPath))).build(); Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, new WorkUnitState()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java index 48917bb..3b25cf3 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/StreamCopier.java @@ -26,12 +26,12 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; + import com.codahale.metrics.Meter; import org.apache.gobblin.util.limiter.Limiter; -import javax.annotation.concurrent.NotThreadSafe; - /** * A class that copies an {@link InputStream} to an {@link OutputStream} in a configurable way. @@ -44,6 +44,8 @@ public class StreamCopier { private final ReadableByteChannel inputChannel; private final WritableByteChannel outputChannel; + + private final Long maxBytes; private int bufferSize = DEFAULT_BUFFER_SIZE; private Meter copySpeedMeter; @@ -51,12 +53,21 @@ public class StreamCopier { private volatile boolean copied = false; public StreamCopier(InputStream inputStream, OutputStream outputStream) { - this(Channels.newChannel(inputStream), Channels.newChannel(outputStream)); + this(inputStream, outputStream, null); + } + + public StreamCopier(InputStream inputStream, OutputStream outputStream, Long maxBytes) { + this(Channels.newChannel(inputStream), Channels.newChannel(outputStream), maxBytes); } public StreamCopier(ReadableByteChannel inputChannel, WritableByteChannel outputChannel) { + this(inputChannel, outputChannel, null); + } + + public StreamCopier(ReadableByteChannel inputChannel, WritableByteChannel outputChannel, Long maxBytes) { this.inputChannel = inputChannel; this.outputChannel = outputChannel; + this.maxBytes = maxBytes; } /** @@ -84,7 +95,8 @@ public class StreamCopier { } /** - * Execute the copy of bytes from the input to the output stream. + * Execute the copy of bytes from the input to the output stream. If maxBytes is specified, limits the number of + * bytes copied to maxBytes. * Note: this method should only be called once. Further calls will throw a {@link IllegalStateException}. * @return Number of bytes copied. */ @@ -96,19 +108,28 @@ public class StreamCopier { this.copied = true; try { - long bytesRead = 0; - long totalBytesRead = 0; + long numBytes = 0; + long totalBytes = 0; final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize); - while ((bytesRead = fillBufferFromInputChannel(buffer)) != -1) { - totalBytesRead += bytesRead; + // Only keep copying if we've read less than maxBytes (if maxBytes exists) + while ((this.maxBytes == null || this.maxBytes > totalBytes) && + (numBytes = fillBufferFromInputChannel(buffer)) != -1) { + totalBytes += numBytes; // flip the buffer to be written buffer.flip(); + + // If we've read more than maxBytes, discard enough bytes to only write maxBytes. + if (this.maxBytes != null && totalBytes > this.maxBytes) { + buffer.limit(buffer.limit() - (int) (totalBytes - this.maxBytes)); + totalBytes = this.maxBytes; + } + this.outputChannel.write(buffer); // Clear if empty buffer.compact(); if (this.copySpeedMeter != null) { - this.copySpeedMeter.mark(bytesRead); + this.copySpeedMeter.mark(numBytes); } } // Done writing, now flip to read again @@ -118,7 +139,7 @@ public class StreamCopier { this.outputChannel.write(buffer); } - return totalBytesRead; + return totalBytes; } finally { if (this.closeChannelsOnComplete) { this.inputChannel.close(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cba36992/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java index afde7dd..d2a8357 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/io/StreamCopierTest.java @@ -27,8 +27,6 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.common.base.Charsets; -import org.apache.gobblin.util.limiter.CountBasedLimiter; - public class StreamCopierTest { @@ -59,6 +57,19 @@ public class StreamCopierTest { } @Test + public void testBlockCopy() throws Exception { + String testString = "This is a string"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(testString.getBytes(Charsets.UTF_8)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + long maxBytes = 4L; + new StreamCopier(inputStream, outputStream, maxBytes).copy(); + + Assert.assertEquals(new String(outputStream.toByteArray(), Charsets.UTF_8), + testString.substring(0, (int) maxBytes)); + } + + @Test public void testCopyMeter() throws Exception { String testString = "This is a string"; Meter meter = new MetricRegistry().meter("my.meter");
