This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ac4a8104f [GOBBLIN-1698] Fast fail during work unit generation based 
on config. (#3542)
ac4a8104f is described below

commit ac4a8104f7826fc5661b958cf8fc338648d1ac0e
Author: meethngala <[email protected]>
AuthorDate: Tue Sep 13 13:32:41 2022 -0400

    [GOBBLIN-1698] Fast fail during work unit generation based on config. 
(#3542)
    
    * fast fail during work unit generation based on config.
    
    * [GOBBLIN-1690] Added logging to ORC writer
    
    Closes #3543 from rdsr/master
    
    * [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
    
    * Refactor git flowgraph component to be extensible
    
    * Move files to appropriate modules
    
    * Cleanup and add javadocs
    
    * Cleanup, add missing javadocs
    
    * Address review and import order
    
    * Fix findbugs
    
    * Use java sort instead of collections
    
    * Add GMCE topic explicitly to hive commit event (#3547)
    
    * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode 
(#3544)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
    
    * add orchestor as listener before service start
    
    * fix code style
    
    * address comments
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * remove unused import
    
    * address comments
    
    * fix typo
    
    Co-authored-by: Zihan Li <[email protected]>
    
    * fast fail during work unit generation based on config.
    
    Co-authored-by: Meeth Gala <[email protected]>
    Co-authored-by: Ratandeep <[email protected]>
    Co-authored-by: William Lo <[email protected]>
    Co-authored-by: Jack Moseley <[email protected]>
    Co-authored-by: Zihan Li <[email protected]>
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 ++
 .../gobblin/data/management/copy/CopySource.java   |  4 +++
 .../data/management/copy/CopySourceTest.java       | 40 ++++++++++++++++++++++
 3 files changed, 46 insertions(+)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 464a575ba..00817be63 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -296,6 +296,8 @@ public class ConfigurationKeys {
   public static final String WORK_UNIT_STATE_ACTUAL_HIGH_WATER_MARK_KEY = 
"workunit.state.actual.high.water.mark";
   public static final String WORK_UNIT_DATE_PARTITION_KEY = 
"workunit.source.date.partition";
   public static final String WORK_UNIT_DATE_PARTITION_NAME = 
"workunit.source.date.partitionName";
+  public static final String WORK_UNIT_GENERATOR_FAILURE_IS_FATAL = 
"workunit.generator.failure.is.fatal";
+  public static final boolean DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED = true;
 
   /**
    * Task execution properties.
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 175d0b311..2773f280f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -214,6 +214,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
       failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
 
       String filesetWuGeneratorAlias = 
state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, 
FileSetWorkUnitGenerator.class.getName());
+      boolean shouldWuGeneratorFailureBeFatal = 
state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL, 
ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
       Iterator<Callable<Void>> callableIterator =
           Iterators.transform(prioritizedFileSets, new 
Function<FileSet<CopyEntity>, Callable<Void>>() {
             @Nullable
@@ -239,6 +240,9 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
             future.get();
           } catch (ExecutionException exc) {
             log.error("Failed to get work units for dataset.", exc.getCause());
+            if (shouldWuGeneratorFailureBeFatal) {
+              throw new RuntimeException("Failed to get work units for 
dataset.", exc.getCause());
+            }
           }
         }
       } catch (InterruptedException ie) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 68f683ca6..879f53f77 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -16,6 +16,8 @@
  */
 
 package org.apache.gobblin.data.management.copy;
+import com.google.common.base.Optional;
+import com.google.common.collect.SetMultimap;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
@@ -28,6 +30,9 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.State;
+import 
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.hadoop.fs.FileSystem;
@@ -339,4 +344,39 @@ public class CopySourceTest {
       Assert.assertEquals(datasetPaths.contains(tempDirRoot + 
"/targetPath/testDB/table" + i), true);
     }
   }
+
+  @Test (expectedExceptions = RuntimeException.class)
+  public void testGetWorkUnitsExecutionFastFailure() {
+
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, 
AlwaysThrowsMockedFileSetWorkUnitGenerator.class.getName());
+    state.setProp(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL, 
ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
+
+    CopySource source = new CopySource();
+    // throws the runtime exception after encountering a failure generating 
the work units
+    List<WorkUnit> workunits = source.getWorkunits(state);
+    Assert.assertNull(workunits);
+  }
+
+  class AlwaysThrowsMockedFileSetWorkUnitGenerator extends 
CopySource.FileSetWorkUnitGenerator {
+
+    public AlwaysThrowsMockedFileSetWorkUnitGenerator(CopyableDatasetBase 
copyableDataset, FileSet<CopyEntity> fileSet, State state,
+        FileSystem targetFs, SetMultimap<FileSet<CopyEntity>, WorkUnit> 
workUnitList,
+        Optional<CopyableFileWatermarkGenerator> watermarkGenerator, long 
minWorkUnitWeight,
+        Optional<LineageInfo> lineageInfo) {
+      super(copyableDataset, fileSet, state, targetFs, workUnitList, 
watermarkGenerator, minWorkUnitWeight,
+          lineageInfo);
+    }
+
+    @Override
+    public Void call(){
+      throw new RuntimeException("boom!");
+    }
+  }
 }

Reply via email to