This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d991a9bd [GOBBLIN-1734] make DestinationDatasetHandler work on 
streaming sources (#3592)
4d991a9bd is described below

commit 4d991a9bd872a98bc52dda2ec366ad7d77847f9a
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Oct 25 15:10:21 2022 -0500

    [GOBBLIN-1734] make DestinationDatasetHandler work on streaming sources 
(#3592)
    
    * make DestinationDatasetHandler work on streaming sources
    
    * switch DestinationDatasetHandlerService from list of workunits to 
workunitstream
    
    * fix unit test
---
 .../destination/DestinationDatasetHandler.java     | 13 +++++++++--
 .../DestinationDatasetHandlerService.java          | 25 ++++++++--------------
 .../destination/TestDestinationDatasetHandler.java | 13 ++++++-----
 .../gobblin/runtime/AbstractJobLauncher.java       | 12 +++++++----
 4 files changed, 34 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
index 758634d9f..fb4cedd8b 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
@@ -21,17 +21,26 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+
 
 /**
- * Performs work related to initializing the target environment before the 
files are written and published
+ * Performs work related to initializing the target environment before the 
files are written and published.
+ * Implementations should be aware that a {@link WorkUnitStream} may be of 
streaming type.
  */
 public interface DestinationDatasetHandler extends Closeable {
 
   /**
    * Handle destination setup before workunits are sent to writer and publisher
+   * This method is deprecated in favor of {@link #handle(WorkUnitStream)}.
    * @param workUnits
    */
-  void handle(Collection<WorkUnit> workUnits) throws IOException;
+  @Deprecated
+  default void handle(Collection<WorkUnit> workUnits) throws IOException {}
+
+  default WorkUnitStream handle(WorkUnitStream workUnitStream) throws 
IOException {
+    return workUnitStream;
+  }
 
   /**
    * Perform cleanup if needed
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
index 41c768da0..08c824e9d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
@@ -20,14 +20,12 @@ package org.apache.gobblin.destination;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.util.JobLauncherUtils;
 
 
 /**
@@ -52,21 +50,16 @@ public class DestinationDatasetHandlerService implements 
Closeable {
    * Executes handlers
    * @param workUnitStream
    */
-  public void executeHandlers(WorkUnitStream workUnitStream) {
-    if (handlers.size() > 0) {
-      if (workUnitStream.isSafeToMaterialize()) {
-        Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
-          for (DestinationDatasetHandler handler : this.handlers) {
-            try {
-              handler.handle(workUnits);
-            } catch (IOException e) {
-              throw new RuntimeException(String.format("Handler %s failed to 
execute", handler.getClass().getName()), e);
-            }
-          }
-      } else {
-        throw new 
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not 
support work unit streams");
+  public WorkUnitStream executeHandlers(WorkUnitStream workUnitStream) {
+    for (DestinationDatasetHandler handler : this.handlers) {
+      try {
+       workUnitStream = handler.handle(workUnitStream);
+      } catch (IOException e) {
+        throw new RuntimeException(String.format("Handler %s failed to 
execute", handler.getClass().getName()), e);
       }
     }
+
+    return workUnitStream;
   }
 
 
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
index 102c38cda..ae87ec570 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
@@ -18,23 +18,22 @@
 package org.apache.gobblin.destination;
 
 import java.io.IOException;
-import java.util.Collection;
+
 import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
 
 
 public class TestDestinationDatasetHandler implements 
DestinationDatasetHandler {
   public static String TEST_COUNTER_KEY = "counter";
-  private Boolean canCleanUp;
   public TestDestinationDatasetHandler(SourceState state, Boolean canCleanUp){
-    this.canCleanUp = canCleanUp;
   }
 
   @Override
-  public void handle(Collection<WorkUnit> workUnits) {
-    for (WorkUnit wu: workUnits) {
+  public WorkUnitStream handle(WorkUnitStream workUnitSteam) {
+    return workUnitSteam.transform(wu -> {
       wu.setProp(TEST_COUNTER_KEY, wu.getPropAsInt(TEST_COUNTER_KEY, 0) + 1);
-    }
+      return wu;
+    });
   }
 
   @Override
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 30bfca5ce..84cab042e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -508,10 +508,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           }
         }
 
-        // Perform work needed before writing is done
-        Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
-        closer.register(new DestinationDatasetHandlerService(jobState, 
canCleanUp, this.eventSubmitter))
-            .executeHandlers(workUnitStream);
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
         closer.register(ConverterInitializerFactory.newInstance(jobState, 
workUnitStream)).initialize();
@@ -544,6 +540,14 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
               
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
           // Add task ids
           workUnitStream = prepareWorkUnits(workUnitStream, jobState);
+
+          // Perform work needed before writing is done
+          Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
+          try (DestinationDatasetHandlerService 
destinationDatasetHandlerService =
+              new DestinationDatasetHandlerService(jobState, canCleanUp, 
this.eventSubmitter)) {
+            workUnitStream = 
destinationDatasetHandlerService.executeHandlers(workUnitStream);
+          }
+
           // Remove skipped workUnits from the list of work units to execute.
           workUnitStream = workUnitStream.filter(new 
SkippedWorkUnitsFilter(jobState));
           // Add surviving tasks to jobState

Reply via email to