This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6849776cf6 [GOBBLIN-2225] Use random id for Workunit if recovery 
helper is not initialised (#4134)
6849776cf6 is described below

commit 6849776cf605fc30051830daaa333623972bf5e9
Author: thisisArjit <[email protected]>
AuthorDate: Wed Sep 3 12:46:41 2025 +0530

    [GOBBLIN-2225] Use random id for Workunit if recovery helper is not 
initialised (#4134)
    
    * Add unique id to CopyEntity instead of computing it from serialised json 
string
    
    * Add unique id to CopyEntity instead of computing it from serialised json 
string
---
 .../gobblin/data/management/copy/CopyEntity.java       |  4 +++-
 .../gobblin/data/management/copy/CopySource.java       | 18 +++++++++++++++---
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
index cd4b97e9d9..4790a9b4c1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
@@ -118,7 +118,9 @@ public class CopyEntity implements HasGuid {
    */
   public static String getSerializedWithNewPackage(String serialized) {
     serialized = serialized.replace("\"gobblin.data.management.", 
"\"org.apache.gobblin.data.management.");
-    log.debug("Serialized updated copy entity: " + serialized);
+    if (log.isDebugEnabled()) {
+      log.debug("Serialized updated copy entity: " + serialized);
+    }
     return serialized;
   }
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 5f9a75db1e..df8bf9bfe5 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.data.management.copy;
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -54,6 +56,7 @@ import 
org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
 import 
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
 import 
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
 import 
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
+import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
 import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
 import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import 
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
@@ -372,6 +375,10 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
 
         Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, 
CopyConfiguration.COPY_PREFIX, extractId);
         List<WorkUnit> workUnitsForPartition = Lists.newArrayList();
+        // use random guid for workunits if recovery is disabled. Getting 
deterministic guid is expensive as it
+        // serialises entire workunit to generate guid
+        boolean useRandomGuidForWorkUnit = !isRecoveryEnabled(state);
+        log.info("Using " + (useRandomGuidForWorkUnit ? "random" : 
"deterministic") + " guids for workunits");
 
         long fileSize;
         for (CopyEntity copyEntity : fileSet.getFiles()) {
@@ -400,7 +407,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
           workUnit.setProp(SlaEventKeys.PARTITION_KEY, 
copyEntity.getFileSet());
           setWorkUnitWeight(workUnit, copyEntity, minWorkUnitWeight);
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
-          computeAndSetWorkUnitGuid(workUnit, copyEntity);
+          computeAndSetWorkUnitGuid(workUnit, copyEntity, 
useRandomGuidForWorkUnit);
           addLineageInfo(copyEntity, workUnit);
           if (copyEntity instanceof CopyableFile) {
             CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
@@ -427,6 +434,10 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
       }
     }
 
+    private static boolean isRecoveryEnabled(State state) {
+      return state.contains(RecoveryHelper.PERSIST_DIR_KEY);
+    }
+
     private void setWorkUnitWatermark(WorkUnit workUnit, 
Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
         CopyEntity copyEntity)
         throws IOException {
@@ -514,11 +525,12 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
     workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(weight));
   }
 
-  private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity 
copyEntity)
+  private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity 
copyEntity, boolean useRandomGuidForWorkUnit)
       throws IOException {
     Guid guid = 
Guid.fromStrings(workUnit.contains(ConfigurationKeys.CONVERTER_CLASSES_KEY) ? 
workUnit
         .getProp(ConfigurationKeys.CONVERTER_CLASSES_KEY) : "");
-    setWorkUnitGuid(workUnit, guid.append(copyEntity.guid()));
+    Guid copyEntityGuid = useRandomGuidForWorkUnit ? new 
Guid(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)) : 
copyEntity.guid();
+    setWorkUnitGuid(workUnit, guid.append(copyEntityGuid));
   }
 
   /**

Reply via email to