[GOBBLIN-379] Submit an event when DistCp job resource requirements exceed a hard bound
Closes #2257 from sv2000/gobblin-379 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/457ede26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/457ede26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/457ede26 Branch: refs/heads/0.12.0 Commit: 457ede26da2c7f693d4fe321d4c50b0e25e0d22d Parents: bde5bb1 Author: suvasude <[email protected]> Authored: Thu Feb 8 11:37:21 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Feb 8 11:37:21 2018 -0800 ---------------------------------------------------------------------- .../data/management/copy/CopyConfiguration.java | 17 ++- .../data/management/copy/CopySource.java | 105 ++++++++++++++----- .../data/management/copy/CopySourceTest.java | 84 +++++++++++++++ .../ConcurrentBoundedPriorityIterable.java | 80 +++++++++----- .../PriorityIterableBasedRequestAllocator.java | 71 +++++++++---- .../RequestAllocatorConfig.java | 21 +++- .../ConcurrentBoundedPriorityIterableTest.java | 21 ++-- 7 files changed, 309 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java index 211ad13..c4d07e2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java @@ -23,6 +23,8 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable; +import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -65,6 +67,13 @@ public class CopyConfiguration { public static final String ABORT_ON_SINGLE_DATASET_FAILURE = COPY_PREFIX + ".abortOnSingleDatasetFailure"; + /* + * Config to store different classes of rejected requests. Possible values are "all","none", or "min" (default). + */ + public static final String STORE_REJECTED_REQUESTS_KEY = COPY_PREFIX + ".store.rejected.requests"; + public static final String DEFAULT_STORE_REJECTED_REQUESTS = + RequestAllocatorConfig.StoreRejectedRequestsConfig.MIN.name(); + /** * User supplied directory where files should be published. This value is identical for all datasets in the distcp job. */ @@ -81,6 +90,7 @@ public class CopyConfiguration { private final FileSystem targetFs; private final Optional<FileSetComparator> prioritizer; private final ResourcePool maxToCopy; + private final String storeRejectedRequestsSetting; private final Config config; @@ -114,8 +124,8 @@ public class CopyConfiguration { if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) { try { this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor( - new ClassAliasResolver(FileSetComparator.class).resolveClass( - properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties)); + new ClassAliasResolver(FileSetComparator.class) + .resolveClass(properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties)); } catch (ReflectiveOperationException roe) { throw new RuntimeException("Could not build prioritizer.", roe); } @@ -124,6 +134,9 @@ public class CopyConfiguration { } this.maxToCopy = CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, MAX_COPY_PREFIX)); + this.storeRejectedRequestsSetting = + properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, DEFAULT_STORE_REJECTED_REQUESTS); + this.abortOnSingleDatasetFailure = false; if (this.config.hasPath(ABORT_ON_SINGLE_DATASET_FAILURE)) { this.abortOnSingleDatasetFailure = this.config.getBoolean(ABORT_ON_SINGLE_DATASET_FAILURE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 615d6ad..3355f3d 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -29,6 +29,9 @@ import java.util.concurrent.Future; import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +40,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicates; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; @@ -62,6 +66,7 @@ import org.apache.gobblin.dataset.IterableDatasetFinder; import org.apache.gobblin.dataset.IterableDatasetFinderImpl; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -84,13 +89,11 @@ import org.apache.gobblin.util.guid.Guid; import org.apache.gobblin.util.request_allocation.GreedyAllocator; import org.apache.gobblin.util.request_allocation.HierarchicalAllocator; import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer; +import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator; import org.apache.gobblin.util.request_allocation.RequestAllocator; import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig; import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - /** * {@link org.apache.gobblin.source.Source} that generates work units from {@link org.apache.gobblin.data.management.copy.CopyableDataset}s. @@ -112,11 +115,21 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { public static final String SIMULATE = CopyConfiguration.COPY_PREFIX + ".simulate"; public static final String MAX_SIZE_MULTI_WORKUNITS = CopyConfiguration.COPY_PREFIX + ".binPacking.maxSizePerBin"; public static final String MAX_WORK_UNITS_PER_BIN = CopyConfiguration.COPY_PREFIX + ".binPacking.maxWorkUnitsPerBin"; + public static final String REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME = + "RequestsExceedingAvailableResourcePoolEvent"; + public static final String REQUESTS_DROPPED_EVENT_NAME = "RequestsDroppedEvent"; + public static final String REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME = + "RequestsRejectedDueToInsufficientEvictionEvent"; + public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = "RequestsRejectedWithLowPriorityEvent"; + public static final String FILESET_NAME = "fileset.name"; + public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities"; + public static final String FILESET_TOTAL_SIZE_IN_BYTES = "fileset.total.size"; private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX + ".workUnitWeight"; private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); public MetricContext metricContext; + public EventSubmitter eventSubmitter; protected Optional<LineageInfo> lineageInfo; @@ -145,16 +158,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { try { - DeprecationUtils.renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY, - Lists.newArrayList(MAX_FILES_COPIED_KEY)); + DeprecationUtils + .renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY, + Lists.newArrayList(MAX_FILES_COPIED_KEY)); final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state); final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0); state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri()); state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri()); - log.info("Identified source file system at {} and target file system at {}.", - sourceFs.getUri(), targetFs.getUri()); + log.info("Identified source file system at {} and target file system at {}.", sourceFs.getUri(), + targetFs.getUri()); long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0); long maxWorkUnitsPerMultiWorkUnit = state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50); @@ -165,26 +179,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build(); + this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build(); DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils .instantiateDatasetFinder(state.getProperties(), sourceFs, DEFAULT_DATASET_PROFILE_CLASS_KEY, - new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build(), state); + this.eventSubmitter, state); IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder = datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder : new IterableDatasetFinderImpl<>(datasetFinder); - Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = - Iterators.transform(iterableDatasetFinder.getDatasetsIterator(), + Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators + .transform(iterableDatasetFinder.getDatasetsIterator(), new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log)); - Iterator<CopyableDatasetRequestor> requestorIterator = Iterators.filter(requestorIteratorWithNulls, - Predicates.<CopyableDatasetRequestor>notNull()); + Iterator<CopyableDatasetRequestor> requestorIterator = + Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull()); final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitsMap = Multimaps.<FileSet<CopyEntity>, WorkUnit>synchronizedSetMultimap( HashMultimap.<FileSet<CopyEntity>, WorkUnit>create()); RequestAllocator<FileSet<CopyEntity>> allocator = createRequestAllocator(copyConfiguration, maxThreads); - Iterator<FileSet<CopyEntity>> prioritizedFileSets = allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy()); + Iterator<FileSet<CopyEntity>> prioritizedFileSets = + allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy()); + + //Submit alertable events for unfulfilled requests + submitUnfulfilledRequestEvents(allocator); Iterator<Callable<Void>> callableIterator = Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() { @@ -197,8 +216,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { }); try { - List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, - maxThreads, + List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, maxThreads, ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Copy-file-listing-pool-%d"))) .execute(); @@ -231,8 +249,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { return Lists.newArrayList(); } - List<? extends WorkUnit> workUnits = - new WorstFitDecreasingBinPacking(maxSizePerBin).pack(Lists.newArrayList(workUnitsMap.values()), this.weighter); + List<? extends WorkUnit> workUnits = new WorstFitDecreasingBinPacking(maxSizePerBin) + .pack(Lists.newArrayList(workUnitsMap.values()), this.weighter); log.info(String.format( "Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, " + "max work units per bin: %d.", workUnitsMap.size(), workUnits.size(), maxSizePerBin, @@ -243,11 +261,42 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { } } - private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, int maxThreads) { - Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer(); + private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> fileSetList, String eventName) { + for (FileSet<CopyEntity> fileSet : fileSetList) { + GobblinTrackingEvent event = + GobblinTrackingEvent.newBuilder().setName(eventName).setNamespace(CopySource.class.getName()).setMetadata( + ImmutableMap.<String, String>builder() + .put(ConfigurationKeys.DATASET_URN_KEY, fileSet.getDataset().getUrn()) + .put(FILESET_TOTAL_ENTITIES, Integer.toString(fileSet.getTotalEntities())) + .put(FILESET_TOTAL_SIZE_IN_BYTES, Long.toString(fileSet.getTotalSizeInBytes())) + .put(FILESET_NAME, fileSet.getName()).build()).build(); + this.metricContext.submitEvent(event); + } + } + private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) { + if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) { + PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator = + (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) allocator; + submitUnfulfilledRequestEventsHelper( + priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(), + REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME); + submitUnfulfilledRequestEventsHelper( + priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(), + REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME); + submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(), + REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME); + submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(), + REQUESTS_DROPPED_EVENT_NAME); + } + } + + private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, + int maxThreads) { + Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer(); RequestAllocatorConfig.Builder<FileSet<CopyEntity>> configBuilder = RequestAllocatorConfig.builder(new FileSetResourceEstimator()).allowParallelization(maxThreads) + .storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting()) .withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig()); if (!prioritizer.isPresent()) { @@ -323,9 +372,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected * to be set by the same logic */ - if (lineageInfo.isPresent() && - copyableFile.getSourceDataset() != null && - copyableFile.getDestinationDataset() != null) { + if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null + && copyableFile.getDestinationDataset() != null) { lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit); } } @@ -350,7 +398,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { return new EmptyExtractor<>("empty"); } - protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf, WorkUnitState state) + protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf, + WorkUnitState state) throws IOException { return new FileAwareInputStreamExtractor(fs, cf, state); } @@ -365,7 +414,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { @Deprecated protected FileSystem getSourceFileSystem(State state) throws IOException { - Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH)); + Configuration conf = + HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH)); String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state); } @@ -456,11 +506,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET)); } - private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, CopyEntity copyEntity) + private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, + CopyEntity copyEntity) throws IOException { - if (copyEntity instanceof CopyableFile) { + if (copyEntity instanceof CopyableFile) { Optional<WatermarkInterval> watermarkIntervalOptional = - CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile)copyEntity, watermarkGenerator); + CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator); if (watermarkIntervalOptional.isPresent()) { workUnit.setWatermarkInterval(watermarkIntervalOptional.get()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java index d9e5368..377b2cf 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java @@ -17,19 +17,41 @@ package org.apache.gobblin.data.management.copy; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Iterator; import java.util.List; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig; +import org.apache.hadoop.fs.FileSystem; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterators; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.data.management.dataset.DatasetUtils; +import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor; +import org.apache.gobblin.data.management.partition.FileSet; +import org.apache.gobblin.dataset.DatasetsFinder; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.IterableDatasetFinderImpl; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator; +@Slf4j public class CopySourceTest { @Test @@ -106,4 +128,66 @@ public class CopySourceTest { Assert.assertNotNull(extractBelow); } + @Test + public void testSubmitUnfulfilledRequestEvents() + throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + SourceState state = new SourceState(); + + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///"); + state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///"); + state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir"); + state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY, + TestCopyablePartitionableDatasedFinder.class.getCanonicalName()); + state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2); + state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50"); + state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", 2); + state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, + RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase()); + state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory"); + + CopySource source = new CopySource(); + + final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state); + final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0); + + int maxThreads = state + .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES); + + final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build(); + + MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class); + EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build(); + DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils + .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY, + eventSubmitter, state); + + IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder = + datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder + : new IterableDatasetFinderImpl<>(datasetFinder); + + Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators + .transform(iterableDatasetFinder.getDatasetsIterator(), + new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log)); + Iterator<CopyableDatasetRequestor> requestorIterator = + Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull()); + + Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class); + m.setAccessible(true); + PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator = + (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads); + Iterator<FileSet<CopyEntity>> prioritizedFileSets = + allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy()); + List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool(); + Assert.assertEquals(fileSetList.size(), 2); + + FileSet<CopyEntity> fileSet = fileSetList.get(0); + Assert.assertEquals(fileSet.getDataset().getUrn(), "/test"); + Assert.assertEquals(fileSet.getTotalEntities(), 5); + Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50); + + fileSet = fileSetList.get(1); + Assert.assertEquals(fileSet.getDataset().getUrn(), "/test"); + Assert.assertEquals(fileSet.getTotalEntities(), 5); + Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java index c1a4505..bcfcf29 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java @@ -22,15 +22,15 @@ import java.util.Iterator; import java.util.List; import java.util.TreeSet; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - /** * A concurrent bounded priority {@link Iterable}. Given a {@link ResourcePool}, a {@link ResourceEstimator}, and a @@ -65,12 +65,24 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR private int requestsRefused = 0; private int requestsEvicted = 0; + //These are for submitting alertable events + private String storeRejectedRequestsSetting; + @Getter + private List<T> requestsExceedingAvailableResourcePool = Lists.newArrayList(); + @Getter + private List<T> requestsRejectedWithLowPriority = Lists.newArrayList(); + @Getter + private List<T> requestsRejectedDueToInsufficientEviction = Lists.newArrayList(); + @Getter + private List<T> requestsDropped = Lists.newArrayList(); + // These are ResourceRequirements for temporary use to avoid instantiation costs private final ResourceRequirement candidateRequirement; private final ResourceRequirement tmpRequirement; private final ResourceRequirement reuse; - public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer, ResourceEstimator<T> resourceEstimator, ResourcePool pool) { + public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer, + ResourceEstimator<T> resourceEstimator, String storeRejectedRequestsSetting, ResourcePool pool) { this.estimator = resourceEstimator; this.resourcePool = pool; @@ -79,6 +91,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR this.allDifferentComparator = new AllDifferentComparator(); this.elements = new TreeSet<>(this.allDifferentComparator); + this.storeRejectedRequestsSetting = storeRejectedRequestsSetting; + this.currentRequirement = this.resourcePool.getResourceRequirementBuilder().zero().build(); this.maxResourceRequirement = new ResourceRequirement(this.currentRequirement); @@ -94,7 +108,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR */ private class AllDifferentComparator implements Comparator<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>> { @Override - public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1, AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) { + public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1, + AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) { int providedComparison = ConcurrentBoundedPriorityIterable.this.comparator.compare(t1.getT(), t2.getT()); if (providedComparison != 0) { return providedComparison; @@ -111,12 +126,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR */ public boolean add(T t) { if (this.closed) { - throw new RuntimeException(ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!"); + throw new RuntimeException( + ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!"); } - AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> - newElement = new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t, - this.estimator.estimateRequirement(t, this.resourcePool)); + AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> newElement = + new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t, + this.estimator.estimateRequirement(t, this.resourcePool)); boolean addedWorkunits = addImpl(newElement); if (!addedWorkunits) { this.rejectedElement = true; @@ -132,21 +148,30 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR if (this.resourcePool.exceedsHardBound(newElement.getResourceRequirement(), false)) { // item does not fit even in empty pool log.warn(String.format("Request %s is larger than the available resource pool. If the pool is not expanded, " - + "it will never be selected. Request: %s.", newElement.getT(), + + "it will never be selected. Request: %s.", newElement.getT(), this.resourcePool.stringifyRequirement(newElement.getResourceRequirement()))); + if (!this.storeRejectedRequestsSetting + .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.NONE.name())) { + this.requestsExceedingAvailableResourcePool.add(newElement.getT()); + } this.requestsRefused++; return false; } - ResourceRequirement candidateRequirement = - ResourceRequirement.add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement); + ResourceRequirement candidateRequirement = ResourceRequirement + .add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement); if (this.resourcePool.exceedsHardBound(candidateRequirement, false)) { if (this.comparator.compare(this.elements.last().getT(), newElement.getT()) <= 0) { - log.debug("Request {} does not fit in resource pool and is lower priority than current lowest priority request. " - + "Rejecting", newElement.getT()); + log.debug( + "Request {} does not fit in resource pool and is lower priority than current lowest priority request. " + + "Rejecting", newElement.getT()); this.requestsRefused++; + if (this.storeRejectedRequestsSetting + .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) { + this.requestsRejectedWithLowPriority.add(newElement.getT()); + } return false; } @@ -154,11 +179,15 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR this.currentRequirement.copyInto(this.tmpRequirement); - for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements.descendingSet()) { + for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements + .descendingSet()) { if (this.comparator.compare(dropCandidate.getT(), newElement.getT()) <= 0) { - log.debug("Cannot evict enough requests to fit request {}. " - + "Rejecting", newElement.getT()); + log.debug("Cannot evict enough requests to fit request {}. " + "Rejecting", newElement.getT()); this.requestsRefused++; + if (this.storeRejectedRequestsSetting + .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) { + this.requestsRejectedDueToInsufficientEviction.add(newElement.getT()); + } return false; } this.tmpRequirement.subtract(dropCandidate.getResourceRequirement()); @@ -172,6 +201,10 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> drop : toDrop) { log.debug("Evicting request {}.", drop.getT()); this.requestsEvicted++; + if (this.storeRejectedRequestsSetting + .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) { + this.requestsDropped.add(drop.getT()); + } this.elements.remove(drop); this.currentRequirement.subtract(drop.getResourceRequirement()); } @@ -205,13 +238,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR StringBuilder messageBuilder = new StringBuilder("Statistics for "). append(ConcurrentBoundedPriorityIterable.class.getSimpleName()).append(": {"); messageBuilder.append(this.resourcePool).append(", "); - messageBuilder.append("totalResourcesUsed: ").append(this.resourcePool.stringifyRequirement(this.currentRequirement)) - .append(", "); - messageBuilder.append("maxRequirementPerDimension: ").append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)) - .append(", "); + messageBuilder.append("totalResourcesUsed: ") + .append(this.resourcePool.stringifyRequirement(this.currentRequirement)).append(", "); + messageBuilder.append("maxRequirementPerDimension: ") + .append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)).append(", "); messageBuilder.append("requestsOffered: ").append(this.requestsOffered).append(", "); - messageBuilder.append("requestsAccepted: ").append(this.requestsOffered - this.requestsEvicted - this.requestsRefused) - .append(", "); + messageBuilder.append("requestsAccepted: ") + .append(this.requestsOffered - this.requestsEvicted - this.requestsRefused).append(", "); messageBuilder.append("requestsRefused: ").append(this.requestsRefused).append(", "); messageBuilder.append("requestsEvicted: ").append(this.requestsEvicted); messageBuilder.append("}"); @@ -228,5 +261,4 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR this.closed = true; return this.elements.iterator(); } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java index 876a4eb..f5be73e 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java @@ -22,6 +22,9 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import lombok.AccessLevel; +import lombok.Getter; + import org.slf4j.Logger; import com.google.common.base.Function; @@ -32,22 +35,35 @@ import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.executors.IteratorExecutor; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; - -@AllArgsConstructor public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>> implements RequestAllocator<T> { private final Logger log; @Getter(value = AccessLevel.PROTECTED) private final RequestAllocatorConfig<T> configuration; + //These are for submitting alertable events + @Getter + private List<T> requestsExceedingAvailableResourcePool; + @Getter + private List<T> requestsRejectedWithLowPriority; + @Getter + private List<T> requestsRejectedDueToInsufficientEviction; + @Getter + private List<T> requestsDropped; + + public PriorityIterableBasedRequestAllocator(Logger log, RequestAllocatorConfig<T> configuration) { + this.log = log; + this.configuration = configuration; + } + @Override - public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors, ResourcePool resourcePool) { + public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors, + ResourcePool resourcePool) { final ConcurrentBoundedPriorityIterable<T> iterable = - new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(), this.configuration.getResourceEstimator(), resourcePool); + new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(), + this.configuration.getResourceEstimator(), this.configuration.getStoreRejectedRequestsSetting(), + resourcePool); final Iterator<T> joinIterator = getJoinIterator(requestors, iterable); @@ -57,36 +73,47 @@ public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T> } } else { - IteratorExecutor<Void> executor = new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() { - @Override - public Callable<Void> apply(final T input) { - return new Callable<Void>() { + IteratorExecutor<Void> executor = + new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() { @Override - public Void call() - throws Exception { - iterable.add(input); - return null; + public Callable<Void> apply(final T input) { + return new Callable<Void>() { + @Override + public Void call() + throws Exception { + iterable.add(input); + return null; + } + }; } - }; - } - }), this.configuration.getAllowedThreads(), - ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d"))); + }), this.configuration.getAllowedThreads(), + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d"))); try { List<Either<Void, ExecutionException>> results = executor.executeAndGetResults(); IteratorExecutor.logFailures(results, log, 10); } catch (InterruptedException ie) { log.error("Request allocation was interrupted."); - return new AllocatedRequestsIteratorBase<>(Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(), - resourcePool); + return new AllocatedRequestsIteratorBase<>( + Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(), resourcePool); } } iterable.logStatistics(Optional.of(this.log)); + + //Get all requests rejected/dropped + getRejectedAndDroppedRequests(iterable); + return new AllocatedRequestsIteratorBase<>(iterable.iterator(), resourcePool); } + public void getRejectedAndDroppedRequests(ConcurrentBoundedPriorityIterable<T> iterable) { + requestsExceedingAvailableResourcePool = iterable.getRequestsExceedingAvailableResourcePool(); + requestsRejectedWithLowPriority = iterable.getRequestsRejectedWithLowPriority(); + requestsRejectedDueToInsufficientEviction = iterable.getRequestsRejectedDueToInsufficientEviction(); + requestsDropped = iterable.getRequestsDropped(); + } + protected abstract Iterator<T> getJoinIterator(Iterator<? extends Requestor<T>> requestors, ConcurrentBoundedPriorityIterable<T> requestIterable); - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java index b33070b..5a3051a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java @@ -20,12 +20,12 @@ package org.apache.gobblin.util.request_allocation; import java.io.Serializable; import java.util.Comparator; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - import lombok.AllArgsConstructor; import lombok.Getter; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + @AllArgsConstructor @Getter @@ -34,6 +34,11 @@ public class RequestAllocatorConfig<T extends Request<T>> { private final ResourceEstimator<T> resourceEstimator; private final int allowedThreads; private Config limitedScopeConfig; + private String storeRejectedRequestsSetting; + + public enum StoreRejectedRequestsConfig { + ALL, MIN, NONE + } public static <T extends Request<T>> Builder<T> builder(ResourceEstimator<T> resourceEstimator) { return new Builder<>(resourceEstimator); @@ -44,6 +49,7 @@ public class RequestAllocatorConfig<T extends Request<T>> { private final ResourceEstimator<T> resourceEstimator; private int allowedThreads = 1; private Config limitedScopeConfig; + private String storeRejectedRequestsSetting = StoreRejectedRequestsConfig.MIN.name(); public Builder(ResourceEstimator<T> resourceEstimator) { this.resourceEstimator = resourceEstimator; @@ -68,11 +74,17 @@ public class RequestAllocatorConfig<T extends Request<T>> { return this; } + public Builder<T> storeRejectedRequests(String storeRejectedRequestsSetting) { + this.storeRejectedRequestsSetting = storeRejectedRequestsSetting; + return this; + } + public RequestAllocatorConfig<T> build() { if (this.limitedScopeConfig == null) { this.limitedScopeConfig = ConfigFactory.empty(); } - return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads, this.limitedScopeConfig); + return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads, + this.limitedScopeConfig, this.storeRejectedRequestsSetting); } } @@ -82,5 +94,4 @@ public class RequestAllocatorConfig<T extends Request<T>> { return 0; } } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java index 58a56ea..2145bc0 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java @@ -32,10 +32,12 @@ public class ConcurrentBoundedPriorityIterableTest { public static final String MEMORY = "memory"; @Test - public void test() throws Exception { + public void test() + throws Exception { - ConcurrentBoundedPriorityIterable<String> iterable = new ConcurrentBoundedPriorityIterable<>(new MyComparator(), - new MyEstimator(), ResourcePool.builder().maxResource(MEMORY, 100.).build()); + ConcurrentBoundedPriorityIterable<String> iterable = + new ConcurrentBoundedPriorityIterable<>(new MyComparator(), new MyEstimator(), "min", + ResourcePool.builder().maxResource(MEMORY, 100.).build()); // doesn't fit Assert.assertFalse(iterable.add("a-500")); @@ -57,8 +59,8 @@ public class ConcurrentBoundedPriorityIterableTest { Assert.assertTrue(iterable.add("b-50")); // Check items - List<String> items = Lists.newArrayList(Iterators.transform(iterable.iterator(), - new AllocatedRequestsIteratorBase.TExtractor<String>())); + List<String> items = Lists + .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>())); Assert.assertEquals(items.size(), 2); Assert.assertEquals(items.get(0), "b-50"); Assert.assertEquals(items.get(1), "d-50"); @@ -66,15 +68,15 @@ public class ConcurrentBoundedPriorityIterableTest { iterable.reopen(); // a high priority that won't fit even with evictions should not evict anything Assert.assertFalse(iterable.add("c-500")); - items = Lists.newArrayList(Iterators.transform(iterable.iterator(), - new AllocatedRequestsIteratorBase.TExtractor<String>())); + items = Lists + .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>())); Assert.assertEquals(items.size(), 2); iterable.reopen(); // even if it is higher priority than everything else Assert.assertFalse(iterable.add("a-500")); - items = Lists.newArrayList(Iterators.transform(iterable.iterator(), - new AllocatedRequestsIteratorBase.TExtractor<String>())); + items = Lists + .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>())); Assert.assertEquals(items.size(), 2); } @@ -94,5 +96,4 @@ public class ConcurrentBoundedPriorityIterableTest { return resourcePool.getResourceRequirementBuilder().setRequirement(MEMORY, memory).build(); } } - } \ No newline at end of file
