This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6849776cf6 [GOBBLIN-2225] Use random id for Workunit if recovery
helper is not initialised (#4134)
6849776cf6 is described below
commit 6849776cf605fc30051830daaa333623972bf5e9
Author: thisisArjit <[email protected]>
AuthorDate: Wed Sep 3 12:46:41 2025 +0530
[GOBBLIN-2225] Use random id for Workunit if recovery helper is not
initialised (#4134)
* Add unique id to CopyEntity instead of computing it from serialised json
string
* Add unique id to CopyEntity instead of computing it from serialised json
string
---
.../gobblin/data/management/copy/CopyEntity.java | 4 +++-
.../gobblin/data/management/copy/CopySource.java | 18 +++++++++++++++---
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
index cd4b97e9d9..4790a9b4c1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyEntity.java
@@ -118,7 +118,9 @@ public class CopyEntity implements HasGuid {
*/
public static String getSerializedWithNewPackage(String serialized) {
serialized = serialized.replace("\"gobblin.data.management.",
"\"org.apache.gobblin.data.management.");
- log.debug("Serialized updated copy entity: " + serialized);
+ if (log.isDebugEnabled()) {
+ log.debug("Serialized updated copy entity: " + serialized);
+ }
return serialized;
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 5f9a75db1e..df8bf9bfe5 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -54,6 +56,7 @@ import
org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
import
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
+import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import
org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
@@ -372,6 +375,10 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY,
CopyConfiguration.COPY_PREFIX, extractId);
List<WorkUnit> workUnitsForPartition = Lists.newArrayList();
+ // use random guid for workunits if recovery is disabled. Getting
deterministic guid is expensive as it
+ // serialises entire workunit to generate guid
+ boolean useRandomGuidForWorkUnit = !isRecoveryEnabled(state);
+ log.info("Using " + (useRandomGuidForWorkUnit ? "random" :
"deterministic") + " guids for workunits");
long fileSize;
for (CopyEntity copyEntity : fileSet.getFiles()) {
@@ -400,7 +407,7 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
workUnit.setProp(SlaEventKeys.PARTITION_KEY,
copyEntity.getFileSet());
setWorkUnitWeight(workUnit, copyEntity, minWorkUnitWeight);
setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
- computeAndSetWorkUnitGuid(workUnit, copyEntity);
+ computeAndSetWorkUnitGuid(workUnit, copyEntity,
useRandomGuidForWorkUnit);
addLineageInfo(copyEntity, workUnit);
if (copyEntity instanceof CopyableFile) {
CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
@@ -427,6 +434,10 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
}
}
+ private static boolean isRecoveryEnabled(State state) {
+ return state.contains(RecoveryHelper.PERSIST_DIR_KEY);
+ }
+
private void setWorkUnitWatermark(WorkUnit workUnit,
Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
CopyEntity copyEntity)
throws IOException {
@@ -514,11 +525,12 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(weight));
}
- private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity
copyEntity)
+ private static void computeAndSetWorkUnitGuid(WorkUnit workUnit, CopyEntity
copyEntity, boolean useRandomGuidForWorkUnit)
throws IOException {
Guid guid =
Guid.fromStrings(workUnit.contains(ConfigurationKeys.CONVERTER_CLASSES_KEY) ?
workUnit
.getProp(ConfigurationKeys.CONVERTER_CLASSES_KEY) : "");
- setWorkUnitGuid(workUnit, guid.append(copyEntity.guid()));
+ Guid copyEntityGuid = useRandomGuidForWorkUnit ? new
Guid(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)) :
copyEntity.guid();
+ setWorkUnitGuid(workUnit, guid.append(copyEntityGuid));
}
/**