Repository: incubator-gobblin Updated Branches: refs/heads/master fbf7c9bbd -> df75b13e1
[GOBBLIN-364] Exclude JobState from WorkUnit created by PartitionedFileSourceBase Closes #2237 from zxcware/file Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/df75b13e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/df75b13e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/df75b13e Branch: refs/heads/master Commit: df75b13e1f7ee5776ed18b7aade9014fae8deeea Parents: fbf7c9b Author: zhchen <[email protected]> Authored: Wed Jan 10 11:55:45 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Jan 10 11:55:45 2018 -0800 ---------------------------------------------------------------------- .../source/PartitionedFileSourceBase.java | 24 ++++++------- .../extractor/filebased/FileBasedSource.java | 17 +++++---- .../DatePartitionedAvroFileExtractorTest.java | 36 ++++++++++++++++++++ 3 files changed, 54 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java index d317e54..9ec7707 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java @@ -249,15 +249,14 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS private Extract getExtractForFile(PartitionAwareFileRetriever.FileInfo file, String topicName, - SourceState partitionState, + String namespace, Map<Long, Extract> extractMap) { Extract extract = extractMap.get(file.getWatermarkMsSinceEpoch()); if (extract == null) { // Create an extract object for the dayPath - extract = partitionState - .createExtract(this.tableType, partitionState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY), topicName); + extract = new Extract(this.tableType, namespace, topicName); LOG.info("Created extract: " + extract.getExtractId() + " for path " + topicName); extractMap.put(file.getWatermarkMsSinceEpoch(), extract); @@ -277,23 +276,20 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS retriever.getFilesToProcess(this.lowWaterMark, this.maxFilesPerJob - this.fileCount); Collections.sort(filesToPull); String topicName = this.sourceDir.getName(); + String namespace = this.sourceState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY); - SourceState partitionState = new SourceState(); - - partitionState.addAll(this.sourceState); - partitionState.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName); Map<Long, Extract> extractMap = new HashMap<>(); for (PartitionAwareFileRetriever.FileInfo file : filesToPull) { - Extract extract = getExtractForFile(file, topicName, partitionState, extractMap); + Extract extract = getExtractForFile(file, topicName, namespace, extractMap); LOG.info("Will process file " + file.getFilePath()); - partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath()); - partitionState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch()); - partitionState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch()); - partitionState.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch()); - - WorkUnit singleWorkUnit = partitionState.createWorkUnit(extract); + WorkUnit singleWorkUnit = WorkUnit.create(extract); + singleWorkUnit.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName); + singleWorkUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath()); + singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch()); + singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch()); + singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch()); multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java index d693f44..46a0de0 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java @@ -168,28 +168,27 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> { // Distribute the files across the workunits for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += filesPerPartition) { - SourceState partitionState = new SourceState(); - partitionState.addAll(state); + // Use extract table name to create extract + Extract extract = new Extract(tableType, nameSpaceName, extractTableName); + WorkUnit workUnit = WorkUnit.create(extract); // Eventually these setters should be integrated with framework support for generalized watermark handling - partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, + workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, StringUtils.join(effectiveSnapshot, ",")); List<String> partitionFilesToPull = filesToPull.subList(fileOffset, fileOffset + filesPerPartition > filesToPull.size() ? filesToPull.size() : fileOffset + filesPerPartition); - partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, + workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, StringUtils.join(partitionFilesToPull, ",")); if (state.getPropAsBoolean(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, false)) { if (partitionFilesToPull.size() != 1) { throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files"); } - partitionState.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, - partitionState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL)); + workUnit.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, + workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL)); } - // Use extract table name to create extract - Extract extract = partitionState.createExtract(tableType, nameSpaceName, extractTableName); - workUnits.add(partitionState.createWorkUnit(extract)); + workUnits.add(workUnit); } log.info("Total number of work units for the current run: " + (workUnits.size() - previousWorkUnitsForRetry.size())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java index 73b0d06..299ab9a 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java @@ -151,6 +151,42 @@ public class DatePartitionedAvroFileExtractorTest { } @Test + public void testJobStateNotCopiedToWorkUnit() { + + DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource(); + + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); + state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2); + + state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN); + state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print( + this.startDateTime.minusMinutes(1))); + state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY); + state.setProp("date.partitioned.source.partition.prefix", PREFIX); + state.setProp("date.partitioned.source.partition.suffix", SUFFIX); + + String dummyKey = "dummy.job.config"; + state.setProp(dummyKey, "dummy"); + + List<WorkUnit> workunits = source.getWorkunits(state); + + Assert.assertEquals(workunits.size(), 4); + for(WorkUnit wu : workunits) { + if (wu instanceof MultiWorkUnit) { + for (WorkUnit workUnit : ((MultiWorkUnit) wu).getWorkUnits()) { + Assert.assertFalse(workUnit.contains(dummyKey)); + } + } else { + Assert.assertFalse(wu.contains(dummyKey)); + } + } + } + + @Test public void testReadPartitionsByMinute() throws IOException, DataRecordException { DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
