[GOBBLIN-379] Submit an event when DistCp job resource requirements exceed a 
hard bound

Closes #2257 from sv2000/gobblin-379


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/457ede26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/457ede26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/457ede26

Branch: refs/heads/0.12.0
Commit: 457ede26da2c7f693d4fe321d4c50b0e25e0d22d
Parents: bde5bb1
Author: suvasude <[email protected]>
Authored: Thu Feb 8 11:37:21 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Feb 8 11:37:21 2018 -0800

----------------------------------------------------------------------
 .../data/management/copy/CopyConfiguration.java |  17 ++-
 .../data/management/copy/CopySource.java        | 105 ++++++++++++++-----
 .../data/management/copy/CopySourceTest.java    |  84 +++++++++++++++
 .../ConcurrentBoundedPriorityIterable.java      |  80 +++++++++-----
 .../PriorityIterableBasedRequestAllocator.java  |  71 +++++++++----
 .../RequestAllocatorConfig.java                 |  21 +++-
 .../ConcurrentBoundedPriorityIterableTest.java  |  21 ++--
 7 files changed, 309 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 211ad13..c4d07e2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -23,6 +23,8 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 
+import 
org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable;
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -65,6 +67,13 @@ public class CopyConfiguration {
 
   public static final String ABORT_ON_SINGLE_DATASET_FAILURE = COPY_PREFIX + 
".abortOnSingleDatasetFailure";
 
+  /*
+   * Config to store different classes of rejected requests. Possible values 
are "all","none", or "min" (default).
+   */
+  public static final String STORE_REJECTED_REQUESTS_KEY = COPY_PREFIX + 
".store.rejected.requests";
+  public static final String DEFAULT_STORE_REJECTED_REQUESTS =
+      RequestAllocatorConfig.StoreRejectedRequestsConfig.MIN.name();
+
   /**
    * User supplied directory where files should be published. This value is 
identical for all datasets in the distcp job.
    */
@@ -81,6 +90,7 @@ public class CopyConfiguration {
   private final FileSystem targetFs;
   private final Optional<FileSetComparator> prioritizer;
   private final ResourcePool maxToCopy;
+  private final String storeRejectedRequestsSetting;
 
   private final Config config;
 
@@ -114,8 +124,8 @@ public class CopyConfiguration {
       if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
         try {
           this.prioritizer = 
Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor(
-              new ClassAliasResolver(FileSetComparator.class).resolveClass(
-                  properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
+              new ClassAliasResolver(FileSetComparator.class)
+                  
.resolveClass(properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
         } catch (ReflectiveOperationException roe) {
           throw new RuntimeException("Could not build prioritizer.", roe);
         }
@@ -124,6 +134,9 @@ public class CopyConfiguration {
       }
       this.maxToCopy = 
CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, 
MAX_COPY_PREFIX));
 
+      this.storeRejectedRequestsSetting =
+          
properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, 
DEFAULT_STORE_REJECTED_REQUESTS);
+
       this.abortOnSingleDatasetFailure = false;
       if (this.config.hasPath(ABORT_ON_SINGLE_DATASET_FAILURE)) {
         this.abortOnSingleDatasetFailure = 
this.config.getBoolean(ABORT_ON_SINGLE_DATASET_FAILURE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 615d6ad..3355f3d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -29,6 +29,9 @@ import java.util.concurrent.Future;
 
 import javax.annotation.Nullable;
 
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -37,6 +40,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicates;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
@@ -62,6 +66,7 @@ import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -84,13 +89,11 @@ import org.apache.gobblin.util.guid.Guid;
 import org.apache.gobblin.util.request_allocation.GreedyAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
+import 
org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * {@link org.apache.gobblin.source.Source} that generates work units from 
{@link org.apache.gobblin.data.management.copy.CopyableDataset}s.
@@ -112,11 +115,21 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   public static final String SIMULATE = CopyConfiguration.COPY_PREFIX + 
".simulate";
   public static final String MAX_SIZE_MULTI_WORKUNITS = 
CopyConfiguration.COPY_PREFIX + ".binPacking.maxSizePerBin";
   public static final String MAX_WORK_UNITS_PER_BIN = 
CopyConfiguration.COPY_PREFIX + ".binPacking.maxWorkUnitsPerBin";
+  public static final String 
REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME =
+      "RequestsExceedingAvailableResourcePoolEvent";
+  public static final String REQUESTS_DROPPED_EVENT_NAME = 
"RequestsDroppedEvent";
+  public static final String 
REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME =
+      "RequestsRejectedDueToInsufficientEvictionEvent";
+  public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = 
"RequestsRejectedWithLowPriorityEvent";
+  public static final String FILESET_NAME = "fileset.name";
+  public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
+  public static final String FILESET_TOTAL_SIZE_IN_BYTES = 
"fileset.total.size";
 
   private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX 
+ ".workUnitWeight";
   private final WorkUnitWeighter weighter = new 
FieldWeighter(WORK_UNIT_WEIGHT);
 
   public MetricContext metricContext;
+  public EventSubmitter eventSubmitter;
 
   protected Optional<LineageInfo> lineageInfo;
 
@@ -145,16 +158,17 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
 
     try {
 
-      DeprecationUtils.renameDeprecatedKeys(state, 
CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
-          Lists.newArrayList(MAX_FILES_COPIED_KEY));
+      DeprecationUtils
+          .renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." 
+ CopyResourcePool.ENTITIES_KEY,
+              Lists.newArrayList(MAX_FILES_COPIED_KEY));
 
       final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
       final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
       state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri());
       state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri());
 
-      log.info("Identified source file system at {} and target file system at 
{}.",
-          sourceFs.getUri(), targetFs.getUri());
+      log.info("Identified source file system at {} and target file system at 
{}.", sourceFs.getUri(),
+          targetFs.getUri());
 
       long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0);
       long maxWorkUnitsPerMultiWorkUnit = 
state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50);
@@ -165,26 +179,31 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
 
       final CopyConfiguration copyConfiguration = 
CopyConfiguration.builder(targetFs, state.getProperties()).build();
 
+      this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
CopyConfiguration.COPY_PREFIX).build();
       DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
           .instantiateDatasetFinder(state.getProperties(), sourceFs, 
DEFAULT_DATASET_PROFILE_CLASS_KEY,
-              new EventSubmitter.Builder(this.metricContext, 
CopyConfiguration.COPY_PREFIX).build(), state);
+              this.eventSubmitter, state);
 
       IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
           datasetFinder instanceof IterableDatasetFinder ? 
(IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
               : new IterableDatasetFinderImpl<>(datasetFinder);
 
-      Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls =
-          Iterators.transform(iterableDatasetFinder.getDatasetsIterator(),
+      Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+          .transform(iterableDatasetFinder.getDatasetsIterator(),
               new CopyableDatasetRequestor.Factory(targetFs, 
copyConfiguration, log));
-      Iterator<CopyableDatasetRequestor> requestorIterator = 
Iterators.filter(requestorIteratorWithNulls,
-          Predicates.<CopyableDatasetRequestor>notNull());
+      Iterator<CopyableDatasetRequestor> requestorIterator =
+          Iterators.filter(requestorIteratorWithNulls, 
Predicates.<CopyableDatasetRequestor>notNull());
 
       final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitsMap =
           Multimaps.<FileSet<CopyEntity>, WorkUnit>synchronizedSetMultimap(
               HashMultimap.<FileSet<CopyEntity>, WorkUnit>create());
 
       RequestAllocator<FileSet<CopyEntity>> allocator = 
createRequestAllocator(copyConfiguration, maxThreads);
-      Iterator<FileSet<CopyEntity>> prioritizedFileSets = 
allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+      Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+          allocator.allocateRequests(requestorIterator, 
copyConfiguration.getMaxToCopy());
+
+      //Submit alertable events for unfulfilled requests
+      submitUnfulfilledRequestEvents(allocator);
 
       Iterator<Callable<Void>> callableIterator =
           Iterators.transform(prioritizedFileSets, new 
Function<FileSet<CopyEntity>, Callable<Void>>() {
@@ -197,8 +216,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
           });
 
       try {
-        List<Future<Void>> futures = new IteratorExecutor<>(callableIterator,
-            maxThreads,
+        List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, 
maxThreads,
             ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), 
Optional.of("Copy-file-listing-pool-%d")))
             .execute();
 
@@ -231,8 +249,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
         return Lists.newArrayList();
       }
 
-      List<? extends WorkUnit> workUnits =
-          new 
WorstFitDecreasingBinPacking(maxSizePerBin).pack(Lists.newArrayList(workUnitsMap.values()),
 this.weighter);
+      List<? extends WorkUnit> workUnits = new 
WorstFitDecreasingBinPacking(maxSizePerBin)
+          .pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
       log.info(String.format(
           "Bin packed work units. Initial work units: %d, packed work units: 
%d, max weight per bin: %d, "
               + "max work units per bin: %d.", workUnitsMap.size(), 
workUnits.size(), maxSizePerBin,
@@ -243,11 +261,42 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     }
   }
 
-  private RequestAllocator<FileSet<CopyEntity>> 
createRequestAllocator(CopyConfiguration copyConfiguration, int maxThreads) {
-    Optional<FileSetComparator> prioritizer = 
copyConfiguration.getPrioritizer();
+  private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> 
fileSetList, String eventName) {
+    for (FileSet<CopyEntity> fileSet : fileSetList) {
+      GobblinTrackingEvent event =
+          
GobblinTrackingEvent.newBuilder().setName(eventName).setNamespace(CopySource.class.getName()).setMetadata(
+              ImmutableMap.<String, String>builder()
+                  .put(ConfigurationKeys.DATASET_URN_KEY, 
fileSet.getDataset().getUrn())
+                  .put(FILESET_TOTAL_ENTITIES, 
Integer.toString(fileSet.getTotalEntities()))
+                  .put(FILESET_TOTAL_SIZE_IN_BYTES, 
Long.toString(fileSet.getTotalSizeInBytes()))
+                  .put(FILESET_NAME, fileSet.getName()).build()).build();
+      this.metricContext.submitEvent(event);
+    }
+  }
 
+  private void 
submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) 
{
+    if 
(PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass()))
 {
+      PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> 
priorityIterableBasedRequestAllocator =
+          (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) 
allocator;
+      submitUnfulfilledRequestEventsHelper(
+          
priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(),
+          REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME);
+      submitUnfulfilledRequestEventsHelper(
+          
priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(),
+          REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME);
+      
submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(),
+          REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME);
+      
submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(),
+          REQUESTS_DROPPED_EVENT_NAME);
+    }
+  }
+
+  private RequestAllocator<FileSet<CopyEntity>> 
createRequestAllocator(CopyConfiguration copyConfiguration,
+      int maxThreads) {
+    Optional<FileSetComparator> prioritizer = 
copyConfiguration.getPrioritizer();
     RequestAllocatorConfig.Builder<FileSet<CopyEntity>> configBuilder =
         RequestAllocatorConfig.builder(new 
FileSetResourceEstimator()).allowParallelization(maxThreads)
+            
.storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting())
             
.withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig());
 
     if (!prioritizer.isPresent()) {
@@ -323,9 +372,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
        * a DatasetFinder. Consequently, the source and destination dataset for 
the CopyableFile lineage are expected
        * to be set by the same logic
        */
-      if (lineageInfo.isPresent() &&
-          copyableFile.getSourceDataset() != null &&
-          copyableFile.getDestinationDataset() != null) {
+      if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
+          && copyableFile.getDestinationDataset() != null) {
         lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
       }
     }
@@ -350,7 +398,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     return new EmptyExtractor<>("empty");
   }
 
-  protected Extractor<String, FileAwareInputStream> 
extractorForCopyableFile(FileSystem fs, CopyableFile cf, WorkUnitState state)
+  protected Extractor<String, FileAwareInputStream> 
extractorForCopyableFile(FileSystem fs, CopyableFile cf,
+      WorkUnitState state)
       throws IOException {
     return new FileAwareInputStreamExtractor(fs, cf, state);
   }
@@ -365,7 +414,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   @Deprecated
   protected FileSystem getSourceFileSystem(State state)
       throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state, 
Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
+    Configuration conf =
+        HadoopUtils.getConfFromState(state, 
Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
     String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
     return 
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), 
conf), state);
   }
@@ -456,11 +506,12 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     return 
CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
   }
 
-  private void setWorkUnitWatermark(WorkUnit workUnit, 
Optional<CopyableFileWatermarkGenerator> watermarkGenerator, CopyEntity 
copyEntity)
+  private void setWorkUnitWatermark(WorkUnit workUnit, 
Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
+      CopyEntity copyEntity)
       throws IOException {
-    if (copyEntity instanceof  CopyableFile) {
+    if (copyEntity instanceof CopyableFile) {
       Optional<WatermarkInterval> watermarkIntervalOptional =
-          
CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile)copyEntity, 
watermarkGenerator);
+          CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) 
copyEntity, watermarkGenerator);
       if (watermarkIntervalOptional.isPresent()) {
         workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index d9e5368..377b2cf 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -17,19 +17,41 @@
 
 package org.apache.gobblin.data.management.copy;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
 import java.util.List;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
+import org.apache.hadoop.fs.FileSystem;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterators;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
+import 
org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
 
 
+@Slf4j
 public class CopySourceTest {
 
   @Test
@@ -106,4 +128,66 @@ public class CopySourceTest {
     Assert.assertNotNull(extractBelow);
   }
 
+  @Test
+  public void testSubmitUnfulfilledRequestEvents()
+      throws IOException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", 2);
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        
RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, 
"org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 
CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = 
CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, 
CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, 
CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, 
CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? 
(IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, 
log));
+    Iterator<CopyableDatasetRequestor> requestorIterator =
+        Iterators.filter(requestorIteratorWithNulls, 
Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", 
CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) 
m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requestorIterator, 
copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = 
allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 2);
+
+    FileSet<CopyEntity> fileSet = fileSetList.get(0);
+    Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+    Assert.assertEquals(fileSet.getTotalEntities(), 5);
+    Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+
+    fileSet = fileSetList.get(1);
+    Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+    Assert.assertEquals(fileSet.getTotalEntities(), 5);
+    Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
index c1a4505..bcfcf29 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
@@ -22,15 +22,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * A concurrent bounded priority {@link Iterable}. Given a {@link 
ResourcePool}, a {@link ResourceEstimator}, and a
@@ -65,12 +65,24 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
   private int requestsRefused = 0;
   private int requestsEvicted = 0;
 
+  //These are for submitting alertable events
+  private String storeRejectedRequestsSetting;
+  @Getter
+  private List<T> requestsExceedingAvailableResourcePool = 
Lists.newArrayList();
+  @Getter
+  private List<T> requestsRejectedWithLowPriority = Lists.newArrayList();
+  @Getter
+  private List<T> requestsRejectedDueToInsufficientEviction = 
Lists.newArrayList();
+  @Getter
+  private List<T> requestsDropped = Lists.newArrayList();
+
   // These are ResourceRequirements for temporary use to avoid instantiation 
costs
   private final ResourceRequirement candidateRequirement;
   private final ResourceRequirement tmpRequirement;
   private final ResourceRequirement reuse;
 
-  public ConcurrentBoundedPriorityIterable(final Comparator<? super T> 
prioritizer, ResourceEstimator<T> resourceEstimator, ResourcePool pool) {
+  public ConcurrentBoundedPriorityIterable(final Comparator<? super T> 
prioritizer,
+      ResourceEstimator<T> resourceEstimator, String 
storeRejectedRequestsSetting, ResourcePool pool) {
 
     this.estimator = resourceEstimator;
     this.resourcePool = pool;
@@ -79,6 +91,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements 
Iterable<AllocatedR
     this.allDifferentComparator = new AllDifferentComparator();
     this.elements = new TreeSet<>(this.allDifferentComparator);
 
+    this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+
     this.currentRequirement = 
this.resourcePool.getResourceRequirementBuilder().zero().build();
     this.maxResourceRequirement = new 
ResourceRequirement(this.currentRequirement);
 
@@ -94,7 +108,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements 
Iterable<AllocatedR
    */
   private class AllDifferentComparator implements 
Comparator<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>> {
     @Override
-    public int 
compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1, 
AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
+    public int 
compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1,
+        AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
       int providedComparison = 
ConcurrentBoundedPriorityIterable.this.comparator.compare(t1.getT(), t2.getT());
       if (providedComparison != 0) {
         return providedComparison;
@@ -111,12 +126,13 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
    */
   public boolean add(T t) {
     if (this.closed) {
-      throw new 
RuntimeException(ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is 
no longer accepting requests!");
+      throw new RuntimeException(
+          ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no 
longer accepting requests!");
     }
 
-    AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>
-        newElement = new 
AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
-        this.estimator.estimateRequirement(t, this.resourcePool));
+    AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> newElement 
=
+        new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
+            this.estimator.estimateRequirement(t, this.resourcePool));
     boolean addedWorkunits = addImpl(newElement);
     if (!addedWorkunits) {
       this.rejectedElement = true;
@@ -132,21 +148,30 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
     if 
(this.resourcePool.exceedsHardBound(newElement.getResourceRequirement(), 
false)) {
       // item does not fit even in empty pool
       log.warn(String.format("Request %s is larger than the available resource 
pool. If the pool is not expanded, "
-          + "it will never be selected. Request: %s.", newElement.getT(),
+              + "it will never be selected. Request: %s.", newElement.getT(),
           
this.resourcePool.stringifyRequirement(newElement.getResourceRequirement())));
+      if (!this.storeRejectedRequestsSetting
+          
.equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.NONE.name()))
 {
+        this.requestsExceedingAvailableResourcePool.add(newElement.getT());
+      }
       this.requestsRefused++;
       return false;
     }
 
-    ResourceRequirement candidateRequirement =
-        ResourceRequirement.add(this.currentRequirement, 
newElement.getResourceRequirement(), this.candidateRequirement);
+    ResourceRequirement candidateRequirement = ResourceRequirement
+        .add(this.currentRequirement, newElement.getResourceRequirement(), 
this.candidateRequirement);
 
     if (this.resourcePool.exceedsHardBound(candidateRequirement, false)) {
 
       if (this.comparator.compare(this.elements.last().getT(), 
newElement.getT()) <= 0) {
-        log.debug("Request {} does not fit in resource pool and is lower 
priority than current lowest priority request. "
-            + "Rejecting", newElement.getT());
+        log.debug(
+            "Request {} does not fit in resource pool and is lower priority 
than current lowest priority request. "
+                + "Rejecting", newElement.getT());
         this.requestsRefused++;
+        if (this.storeRejectedRequestsSetting
+            
.equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name()))
 {
+          this.requestsRejectedWithLowPriority.add(newElement.getT());
+        }
         return false;
       }
 
@@ -154,11 +179,15 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
 
       this.currentRequirement.copyInto(this.tmpRequirement);
 
-      for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> 
dropCandidate : this.elements.descendingSet()) {
+      for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> 
dropCandidate : this.elements
+          .descendingSet()) {
         if (this.comparator.compare(dropCandidate.getT(), newElement.getT()) 
<= 0) {
-          log.debug("Cannot evict enough requests to fit request {}. "
-              + "Rejecting", newElement.getT());
+          log.debug("Cannot evict enough requests to fit request {}. " + 
"Rejecting", newElement.getT());
           this.requestsRefused++;
+          if (this.storeRejectedRequestsSetting
+              
.equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name()))
 {
+            
this.requestsRejectedDueToInsufficientEviction.add(newElement.getT());
+          }
           return false;
         }
         this.tmpRequirement.subtract(dropCandidate.getResourceRequirement());
@@ -172,6 +201,10 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
       for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> 
drop : toDrop) {
         log.debug("Evicting request {}.", drop.getT());
         this.requestsEvicted++;
+        if (this.storeRejectedRequestsSetting
+            
.equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name()))
 {
+          this.requestsDropped.add(drop.getT());
+        }
         this.elements.remove(drop);
         this.currentRequirement.subtract(drop.getResourceRequirement());
       }
@@ -205,13 +238,13 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
     StringBuilder messageBuilder = new StringBuilder("Statistics for ").
         
append(ConcurrentBoundedPriorityIterable.class.getSimpleName()).append(": {");
     messageBuilder.append(this.resourcePool).append(", ");
-    messageBuilder.append("totalResourcesUsed: 
").append(this.resourcePool.stringifyRequirement(this.currentRequirement))
-        .append(", ");
-    messageBuilder.append("maxRequirementPerDimension: 
").append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement))
-        .append(", ");
+    messageBuilder.append("totalResourcesUsed: ")
+        
.append(this.resourcePool.stringifyRequirement(this.currentRequirement)).append(",
 ");
+    messageBuilder.append("maxRequirementPerDimension: ")
+        
.append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)).append(",
 ");
     messageBuilder.append("requestsOffered: 
").append(this.requestsOffered).append(", ");
-    messageBuilder.append("requestsAccepted: ").append(this.requestsOffered - 
this.requestsEvicted - this.requestsRefused)
-        .append(", ");
+    messageBuilder.append("requestsAccepted: ")
+        .append(this.requestsOffered - this.requestsEvicted - 
this.requestsRefused).append(", ");
     messageBuilder.append("requestsRefused: 
").append(this.requestsRefused).append(", ");
     messageBuilder.append("requestsEvicted: ").append(this.requestsEvicted);
     messageBuilder.append("}");
@@ -228,5 +261,4 @@ public class ConcurrentBoundedPriorityIterable<T> 
implements Iterable<AllocatedR
     this.closed = true;
     return this.elements.iterator();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index 876a4eb..f5be73e 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import org.slf4j.Logger;
 
 import com.google.common.base.Function;
@@ -32,22 +35,35 @@ import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
 
-@AllArgsConstructor
 public abstract class PriorityIterableBasedRequestAllocator<T extends 
Request<T>> implements RequestAllocator<T> {
 
   private final Logger log;
   @Getter(value = AccessLevel.PROTECTED)
   private final RequestAllocatorConfig<T> configuration;
 
+  //These are for submitting alertable events
+  @Getter
+  private List<T> requestsExceedingAvailableResourcePool;
+  @Getter
+  private List<T> requestsRejectedWithLowPriority;
+  @Getter
+  private List<T> requestsRejectedDueToInsufficientEviction;
+  @Getter
+  private List<T> requestsDropped;
+
+  public PriorityIterableBasedRequestAllocator(Logger log, 
RequestAllocatorConfig<T> configuration) {
+    this.log = log;
+    this.configuration = configuration;
+  }
+
   @Override
-  public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends 
Requestor<T>> requestors, ResourcePool resourcePool) {
+  public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends 
Requestor<T>> requestors,
+      ResourcePool resourcePool) {
     final ConcurrentBoundedPriorityIterable<T> iterable =
-        new 
ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(), 
this.configuration.getResourceEstimator(), resourcePool);
+        new 
ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(),
+            this.configuration.getResourceEstimator(), 
this.configuration.getStoreRejectedRequestsSetting(),
+            resourcePool);
 
     final Iterator<T> joinIterator = getJoinIterator(requestors, iterable);
 
@@ -57,36 +73,47 @@ public abstract class 
PriorityIterableBasedRequestAllocator<T extends Request<T>
       }
     } else {
 
-      IteratorExecutor<Void> executor = new 
IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, 
Callable<Void>>() {
-        @Override
-        public Callable<Void> apply(final T input) {
-          return new Callable<Void>() {
+      IteratorExecutor<Void> executor =
+          new IteratorExecutor<>(Iterators.transform(joinIterator, new 
Function<T, Callable<Void>>() {
             @Override
-            public Void call()
-                throws Exception {
-              iterable.add(input);
-              return null;
+            public Callable<Void> apply(final T input) {
+              return new Callable<Void>() {
+                @Override
+                public Void call()
+                    throws Exception {
+                  iterable.add(input);
+                  return null;
+                }
+              };
             }
-          };
-        }
-      }), this.configuration.getAllowedThreads(),
-          ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("request-allocator-%d")));
+          }), this.configuration.getAllowedThreads(),
+              ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("request-allocator-%d")));
 
       try {
         List<Either<Void, ExecutionException>> results = 
executor.executeAndGetResults();
         IteratorExecutor.logFailures(results, log, 10);
       } catch (InterruptedException ie) {
         log.error("Request allocation was interrupted.");
-        return new 
AllocatedRequestsIteratorBase<>(Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(),
-            resourcePool);
+        return new AllocatedRequestsIteratorBase<>(
+            
Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(),
 resourcePool);
       }
     }
 
     iterable.logStatistics(Optional.of(this.log));
+
+    //Get all requests rejected/dropped
+    getRejectedAndDroppedRequests(iterable);
+
     return new AllocatedRequestsIteratorBase<>(iterable.iterator(), 
resourcePool);
   }
 
+  public void 
getRejectedAndDroppedRequests(ConcurrentBoundedPriorityIterable<T> iterable) {
+    requestsExceedingAvailableResourcePool = 
iterable.getRequestsExceedingAvailableResourcePool();
+    requestsRejectedWithLowPriority = 
iterable.getRequestsRejectedWithLowPriority();
+    requestsRejectedDueToInsufficientEviction = 
iterable.getRequestsRejectedDueToInsufficientEviction();
+    requestsDropped = iterable.getRequestsDropped();
+  }
+
   protected abstract Iterator<T> getJoinIterator(Iterator<? extends 
Requestor<T>> requestors,
       ConcurrentBoundedPriorityIterable<T> requestIterable);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
index b33070b..5a3051a 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
@@ -20,12 +20,12 @@ package org.apache.gobblin.util.request_allocation;
 import java.io.Serializable;
 import java.util.Comparator;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 
 @AllArgsConstructor
 @Getter
@@ -34,6 +34,11 @@ public class RequestAllocatorConfig<T extends Request<T>> {
   private final ResourceEstimator<T> resourceEstimator;
   private final int allowedThreads;
   private Config limitedScopeConfig;
+  private String storeRejectedRequestsSetting;
+
+  public enum StoreRejectedRequestsConfig {
+    ALL, MIN, NONE
+  }
 
   public static <T extends Request<T>> Builder<T> builder(ResourceEstimator<T> 
resourceEstimator) {
     return new Builder<>(resourceEstimator);
@@ -44,6 +49,7 @@ public class RequestAllocatorConfig<T extends Request<T>> {
     private final ResourceEstimator<T> resourceEstimator;
     private int allowedThreads = 1;
     private Config limitedScopeConfig;
+    private String storeRejectedRequestsSetting = 
StoreRejectedRequestsConfig.MIN.name();
 
     public Builder(ResourceEstimator<T> resourceEstimator) {
       this.resourceEstimator = resourceEstimator;
@@ -68,11 +74,17 @@ public class RequestAllocatorConfig<T extends Request<T>> {
       return this;
     }
 
+    public Builder<T> storeRejectedRequests(String 
storeRejectedRequestsSetting) {
+      this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+      return this;
+    }
+
     public RequestAllocatorConfig<T> build() {
       if (this.limitedScopeConfig == null) {
         this.limitedScopeConfig = ConfigFactory.empty();
       }
-      return new RequestAllocatorConfig<>(this.prioritizer, 
this.resourceEstimator, this.allowedThreads, this.limitedScopeConfig);
+      return new RequestAllocatorConfig<>(this.prioritizer, 
this.resourceEstimator, this.allowedThreads,
+          this.limitedScopeConfig, this.storeRejectedRequestsSetting);
     }
   }
 
@@ -82,5 +94,4 @@ public class RequestAllocatorConfig<T extends Request<T>> {
       return 0;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
index 58a56ea..2145bc0 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
@@ -32,10 +32,12 @@ public class ConcurrentBoundedPriorityIterableTest {
   public static final String MEMORY = "memory";
 
   @Test
-  public void test() throws Exception {
+  public void test()
+      throws Exception {
 
-    ConcurrentBoundedPriorityIterable<String> iterable = new 
ConcurrentBoundedPriorityIterable<>(new MyComparator(),
-        new MyEstimator(), ResourcePool.builder().maxResource(MEMORY, 
100.).build());
+    ConcurrentBoundedPriorityIterable<String> iterable =
+        new ConcurrentBoundedPriorityIterable<>(new MyComparator(), new 
MyEstimator(), "min",
+            ResourcePool.builder().maxResource(MEMORY, 100.).build());
 
     // doesn't fit
     Assert.assertFalse(iterable.add("a-500"));
@@ -57,8 +59,8 @@ public class ConcurrentBoundedPriorityIterableTest {
     Assert.assertTrue(iterable.add("b-50"));
 
     // Check items
-    List<String> items = 
Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    List<String> items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new 
AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
     Assert.assertEquals(items.get(0), "b-50");
     Assert.assertEquals(items.get(1), "d-50");
@@ -66,15 +68,15 @@ public class ConcurrentBoundedPriorityIterableTest {
     iterable.reopen();
     // a high priority that won't fit even with evictions should not evict 
anything
     Assert.assertFalse(iterable.add("c-500"));
-    items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new 
AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
 
     iterable.reopen();
     // even if it is higher priority than everything else
     Assert.assertFalse(iterable.add("a-500"));
-    items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new 
AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
   }
 
@@ -94,5 +96,4 @@ public class ConcurrentBoundedPriorityIterableTest {
       return 
resourcePool.getResourceRequirementBuilder().setRequirement(MEMORY, 
memory).build();
     }
   }
-
 }
\ No newline at end of file

Reply via email to