This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2804f10  [GOBBLIN-1604] Throw exception if there are no allocated 
requests due to lack of res… (#3461)
2804f10 is described below

commit 2804f10124241f85f54e51db513ff6831dd408b9
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 10 11:03:08 2022 -0800

    [GOBBLIN-1604] Throw exception if there are no allocated requests due to 
lack of res… (#3461)
    
    * Throw exception if there are no allocated requests due to lack of 
resources
    
    * Fix typo
---
 .../gobblin/data/management/copy/CopySource.java   |  17 +++-
 .../data/management/copy/CopySourceTest.java       | 108 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 074b86a..175d0b3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -209,8 +209,9 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
       Iterator<FileSet<CopyEntity>> prioritizedFileSets =
           allocator.allocateRequests(requestorIterator, 
copyConfiguration.getMaxToCopy());
 
-      //Submit alertable events for unfulfilled requests
+      //Submit alertable events for unfulfilled requests and fail if all of 
the allocated requests were rejected due to size
       submitUnfulfilledRequestEvents(allocator);
+      failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
 
       String filesetWuGeneratorAlias = 
state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, 
FileSetWorkUnitGenerator.class.getName());
       Iterator<Callable<Void>> callableIterator =
@@ -291,6 +292,20 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     }
   }
 
+  void failJobIfAllRequestsRejected(RequestAllocator<FileSet<CopyEntity>> 
allocator,
+      Iterator<FileSet<CopyEntity>> allocatedRequests) throws IOException {
+    // TODO: we should set job as partial success if there is a mix of 
allocated requests and rejections
+    if 
(PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass()))
 {
+      PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> 
priorityIterableBasedRequestAllocator =
+          (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) 
allocator;
+      // If there are no allocated items and are there items exceeding the 
available resources, then we can infer all items exceed resources
+      if (!allocatedRequests.hasNext() && 
priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool().size()
 > 0) {
+        throw new IOException(String.format("Requested copy datasets are all 
larger than the available resource pool. Try increasing %s and/or %s",
+            CopyConfiguration.MAX_COPY_PREFIX + "." + 
CopyResourcePool.ENTITIES_KEY, CopyConfiguration.MAX_COPY_PREFIX + ".size"));
+      }
+    }
+  }
+
   private void 
submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) 
{
     if 
(PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass()))
 {
       PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> 
priorityIterableBasedRequestAllocator =
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 102d6d1..68f683c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -196,6 +196,114 @@ public class CopySourceTest {
     Assert.assertEquals(fileSet.getTotalSizeInBytes(), 75);
   }
 
+  @Test(expectedExceptions = IOException.class)
+  public void testFailIfAllAllocationRequestsRejected()
+      throws IOException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "2");
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        
RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, 
"org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 
CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = 
CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, 
CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, 
CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, 
CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? 
(IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, 
log));
+    Iterator<CopyableDatasetRequestor> requesterIterator =
+        Iterators.filter(requesterIteratorWithNulls, 
Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", 
CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) 
m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requesterIterator, 
copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = 
allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 2);
+    source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+  }
+
+  @Test
+  public void testPassIfNoAllocationsRejected()
+      throws IOException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "100");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "10");
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        
RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, 
"org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 
CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = 
CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, 
CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, 
CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, 
CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? 
(IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, 
log));
+    Iterator<CopyableDatasetRequestor> requesterIterator =
+        Iterators.filter(requesterIteratorWithNulls, 
Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", 
CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) 
m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requesterIterator, 
copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = 
allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 0);
+    source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+  }
+
   @Test
   public void testDefaultHiveDatasetShardTempPaths()
       throws IOException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {

Reply via email to