This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4d991a9bd [GOBBLIN-1734] make DestinationDatasetHandler work on
streaming sources (#3592)
4d991a9bd is described below
commit 4d991a9bd872a98bc52dda2ec366ad7d77847f9a
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Oct 25 15:10:21 2022 -0500
[GOBBLIN-1734] make DestinationDatasetHandler work on streaming sources
(#3592)
* make DestinationDatasetHandler work on streaming sources
* switch DestinationDatasetHandlerService from list of workunits to
workunitstream
* fix unit test
---
.../destination/DestinationDatasetHandler.java | 13 +++++++++--
.../DestinationDatasetHandlerService.java | 25 ++++++++--------------
.../destination/TestDestinationDatasetHandler.java | 13 ++++++-----
.../gobblin/runtime/AbstractJobLauncher.java | 12 +++++++----
4 files changed, 34 insertions(+), 29 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
index 758634d9f..fb4cedd8b 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
@@ -21,17 +21,26 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+
/**
- * Performs work related to initializing the target environment before the
files are written and published
+ * Performs work related to initializing the target environment before the
files are written and published.
+ * Implementations should be aware that a {@link WorkUnitStream} may be of
streaming type.
*/
public interface DestinationDatasetHandler extends Closeable {
/**
* Handle destination setup before workunits are sent to writer and publisher
+ * This method is deprecated in favor of {@link #handle(WorkUnitStream)}.
* @param workUnits
*/
- void handle(Collection<WorkUnit> workUnits) throws IOException;
+ @Deprecated
+ default void handle(Collection<WorkUnit> workUnits) throws IOException {}
+
+ default WorkUnitStream handle(WorkUnitStream workUnitStream) throws
IOException {
+ return workUnitStream;
+ }
/**
* Perform cleanup if needed
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
index 41c768da0..08c824e9d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
@@ -20,14 +20,12 @@ package org.apache.gobblin.destination;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.util.JobLauncherUtils;
/**
@@ -52,21 +50,16 @@ public class DestinationDatasetHandlerService implements
Closeable {
* Executes handlers
* @param workUnitStream
*/
- public void executeHandlers(WorkUnitStream workUnitStream) {
- if (handlers.size() > 0) {
- if (workUnitStream.isSafeToMaterialize()) {
- Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
- for (DestinationDatasetHandler handler : this.handlers) {
- try {
- handler.handle(workUnits);
- } catch (IOException e) {
- throw new RuntimeException(String.format("Handler %s failed to
execute", handler.getClass().getName()), e);
- }
- }
- } else {
- throw new
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not
support work unit streams");
+ public WorkUnitStream executeHandlers(WorkUnitStream workUnitStream) {
+ for (DestinationDatasetHandler handler : this.handlers) {
+ try {
+ workUnitStream = handler.handle(workUnitStream);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Handler %s failed to
execute", handler.getClass().getName()), e);
}
}
+
+ return workUnitStream;
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
index 102c38cda..ae87ec570 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
@@ -18,23 +18,22 @@
package org.apache.gobblin.destination;
import java.io.IOException;
-import java.util.Collection;
+
import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
public class TestDestinationDatasetHandler implements
DestinationDatasetHandler {
public static String TEST_COUNTER_KEY = "counter";
- private Boolean canCleanUp;
public TestDestinationDatasetHandler(SourceState state, Boolean canCleanUp){
- this.canCleanUp = canCleanUp;
}
@Override
- public void handle(Collection<WorkUnit> workUnits) {
- for (WorkUnit wu: workUnits) {
+ public WorkUnitStream handle(WorkUnitStream workUnitSteam) {
+ return workUnitSteam.transform(wu -> {
wu.setProp(TEST_COUNTER_KEY, wu.getPropAsInt(TEST_COUNTER_KEY, 0) + 1);
- }
+ return wu;
+ });
}
@Override
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 30bfca5ce..84cab042e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -508,10 +508,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
- // Perform work needed before writing is done
- Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
- closer.register(new DestinationDatasetHandlerService(jobState,
canCleanUp, this.eventSubmitter))
- .executeHandlers(workUnitStream);
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState,
workUnitStream)).initialize();
@@ -544,6 +540,14 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
// Add task ids
workUnitStream = prepareWorkUnits(workUnitStream, jobState);
+
+ // Perform work needed before writing is done
+ Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
+ try (DestinationDatasetHandlerService
destinationDatasetHandlerService =
+ new DestinationDatasetHandlerService(jobState, canCleanUp,
this.eventSubmitter)) {
+ workUnitStream =
destinationDatasetHandlerService.executeHandlers(workUnitStream);
+ }
+
// Remove skipped workUnits from the list of work units to execute.
workUnitStream = workUnitStream.filter(new
SkippedWorkUnitsFilter(jobState));
// Add surviving tasks to jobState