This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ac4a8104f [GOBBLIN-1698] Fast fail during work unit generation based
on config. (#3542)
ac4a8104f is described below
commit ac4a8104f7826fc5661b958cf8fc338648d1ac0e
Author: meethngala <[email protected]>
AuthorDate: Tue Sep 13 13:32:41 2022 -0400
[GOBBLIN-1698] Fast fail during work unit generation based on config.
(#3542)
* fast fail during work unit generation based on config.
* [GOBBLIN-1690] Added logging to ORC writer
Closes #3543 from rdsr/master
* [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
* Refactor git flowgraph component to be extensible
* Move files to appropriate modules
* Cleanup and add javadocs
* Cleanup, add missing javadocs
* Address review and import order
* Fix findbugs
* Use java sort instead of collections
* Add GMCE topic explicitly to hive commit event (#3547)
* [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
(#3544)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode
* add orchestor as listener before service start
* fix code style
* address comments
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* remove unused import
* address comments
* fix typo
Co-authored-by: Zihan Li <[email protected]>
* fast fail during work unit generation based on config.
Co-authored-by: Meeth Gala <[email protected]>
Co-authored-by: Ratandeep <[email protected]>
Co-authored-by: William Lo <[email protected]>
Co-authored-by: Jack Moseley <[email protected]>
Co-authored-by: Zihan Li <[email protected]>
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 2 ++
.../gobblin/data/management/copy/CopySource.java | 4 +++
.../data/management/copy/CopySourceTest.java | 40 ++++++++++++++++++++++
3 files changed, 46 insertions(+)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 464a575ba..00817be63 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -296,6 +296,8 @@ public class ConfigurationKeys {
public static final String WORK_UNIT_STATE_ACTUAL_HIGH_WATER_MARK_KEY =
"workunit.state.actual.high.water.mark";
public static final String WORK_UNIT_DATE_PARTITION_KEY =
"workunit.source.date.partition";
public static final String WORK_UNIT_DATE_PARTITION_NAME =
"workunit.source.date.partitionName";
+ public static final String WORK_UNIT_GENERATOR_FAILURE_IS_FATAL =
"workunit.generator.failure.is.fatal";
+ public static final boolean DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED = true;
/**
* Task execution properties.
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 175d0b311..2773f280f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -214,6 +214,7 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
String filesetWuGeneratorAlias =
state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS,
FileSetWorkUnitGenerator.class.getName());
+ boolean shouldWuGeneratorFailureBeFatal =
state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL,
ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
Iterator<Callable<Void>> callableIterator =
Iterators.transform(prioritizedFileSets, new
Function<FileSet<CopyEntity>, Callable<Void>>() {
@Nullable
@@ -239,6 +240,9 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
future.get();
} catch (ExecutionException exc) {
log.error("Failed to get work units for dataset.", exc.getCause());
+ if (shouldWuGeneratorFailureBeFatal) {
+ throw new RuntimeException("Failed to get work units for
dataset.", exc.getCause());
+ }
}
}
} catch (InterruptedException ie) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 68f683ca6..879f53f77 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.gobblin.data.management.copy;
+import com.google.common.base.Optional;
+import com.google.common.collect.SetMultimap;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -28,6 +30,9 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.hadoop.fs.FileSystem;
@@ -339,4 +344,39 @@ public class CopySourceTest {
Assert.assertEquals(datasetPaths.contains(tempDirRoot +
"/targetPath/testDB/table" + i), true);
}
}
+
+ @Test (expectedExceptions = RuntimeException.class)
+ public void testGetWorkUnitsExecutionFastFailure() {
+
+ SourceState state = new SourceState();
+
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+ state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+ TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+ state.setProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS,
AlwaysThrowsMockedFileSetWorkUnitGenerator.class.getName());
+ state.setProp(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL,
ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
+
+ CopySource source = new CopySource();
+ // throws the runtime exception after encountering a failure generating
the work units
+ List<WorkUnit> workunits = source.getWorkunits(state);
+ Assert.assertNull(workunits);
+ }
+
+ class AlwaysThrowsMockedFileSetWorkUnitGenerator extends
CopySource.FileSetWorkUnitGenerator {
+
+ public AlwaysThrowsMockedFileSetWorkUnitGenerator(CopyableDatasetBase
copyableDataset, FileSet<CopyEntity> fileSet, State state,
+ FileSystem targetFs, SetMultimap<FileSet<CopyEntity>, WorkUnit>
workUnitList,
+ Optional<CopyableFileWatermarkGenerator> watermarkGenerator, long
minWorkUnitWeight,
+ Optional<LineageInfo> lineageInfo) {
+ super(copyableDataset, fileSet, state, targetFs, workUnitList,
watermarkGenerator, minWorkUnitWeight,
+ lineageInfo);
+ }
+
+ @Override
+ public Void call(){
+ throw new RuntimeException("boom!");
+ }
+ }
}