Repository: incubator-gobblin Updated Branches: refs/heads/master 6936e0d19 -> 6ef8cddc7
[GOBBLIN-520] Add fileSetWorkUnitGenerator customization for CopySource Closes #2390 from yukuai518/wuGenerator Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6ef8cddc Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6ef8cddc Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6ef8cddc Branch: refs/heads/master Commit: 6ef8cddc7710585a3b222ad8b570e5f77b22f26f Parents: 6936e0d Author: Kuai Yu <[email protected]> Authored: Wed Jun 27 22:26:30 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Jun 27 22:26:38 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 5 ++ .../data/management/copy/CopySource.java | 67 ++++++++++++-------- .../src/main/resources/stressTest.conf | 19 ++++++ .../apache/gobblin/util/PropertiesUtils.java | 2 +- 4 files changed, 64 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 50e6020..a56d4b7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -528,6 +528,11 @@ public class ConfigurationKeys { public static final String SQL_SERVER_CONNECTION_PARAMETERS = "source.querybased.sqlserver.connectionParameters"; /** + * Configuration properties used by the CopySource. + */ + public static final String COPY_SOURCE_FILESET_WU_GENERATOR_CLASS = "copy.source.fileset.wu.generator.class"; + + /** * Configuration properties used by the FileBasedExtractor */ public static final String SOURCE_FILEBASED_DATA_DIRECTORY = "source.filebased.data.directory"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 3355f3d..b1ce7a6 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -46,6 +46,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; +import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; @@ -78,6 +79,7 @@ import org.apache.gobblin.source.extractor.extract.AbstractSource; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.WriterUtils; @@ -86,6 +88,7 @@ import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; import org.apache.gobblin.util.deprecation.DeprecationUtils; import org.apache.gobblin.util.executors.IteratorExecutor; import org.apache.gobblin.util.guid.Guid; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.util.request_allocation.GreedyAllocator; import org.apache.gobblin.util.request_allocation.HierarchicalAllocator; import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer; @@ -205,13 +208,19 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { //Submit alertable events for unfulfilled requests submitUnfulfilledRequestEvents(allocator); + String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName()); Iterator<Callable<Void>> callableIterator = Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() { @Nullable @Override public Callable<Void> apply(FileSet<CopyEntity> input) { - return new FileSetWorkUnitGenerator((CopyableDatasetBase) input.getDataset(), input, state, workUnitsMap, - watermarkGenerator, minWorkUnitWeight); + try { + return GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor( + new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias), + input.getDataset(), input, state, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo); + } catch (Exception e) { + throw new RuntimeException("Cannot create workunits generator", e); + } } }); @@ -315,15 +324,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { /** * {@link Runnable} to generate copy listing for one {@link CopyableDataset}. */ + @Alias("FileSetWorkUnitGenerator") @AllArgsConstructor - private class FileSetWorkUnitGenerator implements Callable<Void> { + public static class FileSetWorkUnitGenerator implements Callable<Void> { - private final CopyableDatasetBase copyableDataset; - private final FileSet<CopyEntity> fileSet; - private final State state; - private final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList; - private final Optional<CopyableFileWatermarkGenerator> watermarkGenerator; - private final long minWorkUnitWeight; + protected final CopyableDatasetBase copyableDataset; + protected final FileSet<CopyEntity> fileSet; + protected final State state; + protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList; + protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator; + protected final long minWorkUnitWeight; + protected final Optional<LineageInfo> lineageInfo; @Override public Void call() { @@ -362,19 +373,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { ioe); } } - } - private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) { - if (copyEntity instanceof CopyableFile) { - CopyableFile copyableFile = (CopyableFile) copyEntity; + private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, + CopyEntity copyEntity) + throws IOException { + if (copyEntity instanceof CopyableFile) { + Optional<WatermarkInterval> watermarkIntervalOptional = + CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator); + if (watermarkIntervalOptional.isPresent()) { + workUnit.setWatermarkInterval(watermarkIntervalOptional.get()); + } + } + } + + private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) { + if (copyEntity instanceof CopyableFile) { + CopyableFile copyableFile = (CopyableFile) copyEntity; /* * In Gobblin Distcp, the source and target path info of a CopyableFile are determined by its dataset found by * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected * to be set by the same logic */ - if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null - && copyableFile.getDestinationDataset() != null) { - lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit); + if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null + && copyableFile.getDestinationDataset() != null) { + lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit); + } } } } @@ -505,16 +528,4 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { public static CopyableDatasetMetadata deserializeCopyableDataset(State state) { return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET)); } - - private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, - CopyEntity copyEntity) - throws IOException { - if (copyEntity instanceof CopyableFile) { - Optional<WatermarkInterval> watermarkIntervalOptional = - CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator); - if (watermarkIntervalOptional.isPresent()) { - workUnit.setWatermarkInterval(watermarkIntervalOptional.get()); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-example/src/main/resources/stressTest.conf ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/stressTest.conf b/gobblin-example/src/main/resources/stressTest.conf new file mode 100644 index 0000000..594a080 --- /dev/null +++ b/gobblin-example/src/main/resources/stressTest.conf @@ -0,0 +1,19 @@ +job.name=stressTest1 +job.group=GobblinSamples + +stressTest.numWorkUnits=2 +stressTest.numRecords=20000 +stressTest.computeTimeMicro=900 +stressTest.sleepTimeMicro=2100 + +source.class=org.apache.gobblin.util.test.StressTestingSource + +writer.builder.class=org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder +writer.output.format=txt +data.publisher.type=org.apache.gobblin.publisher.NoopPublisher + +# Work paths +state.store.enabled=false + +# Miscellaneous +job.lock.enabled=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6ef8cddc/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index 4ab6db8..c41273c 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -62,7 +62,7 @@ public class PropertiesUtils { } public static long getPropAsLong(Properties properties, String key, long defaultValue) { - return Long.valueOf(properties.getProperty(key, Long.toString(defaultValue))); + return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue))); } /**
