This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 877a0f8f1a Change data files from byte[] to list of base64 encoded 
string (#4131)
877a0f8f1a is described below

commit 877a0f8f1a728ec70eb02f1a5e504ee62768f16c
Author: thisisArjit <[email protected]>
AuthorDate: Tue Aug 26 19:44:51 2025 +0530

    Change data files from byte[] to list of base64 encoded string (#4131)
---
 .../copy/iceberg/IcebergOverwritePartitionsStep.java | 20 +++++++++++++++-----
 .../copy/iceberg/IcebergPartitionDataset.java        | 19 ++++++++++++++-----
 .../iceberg/IcebergOverwritePartitionsStepTest.java  | 19 ++++++++++++++-----
 3 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index dffcbccb27..13fb5d07b3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -58,7 +59,8 @@ import static 
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
 public class IcebergOverwritePartitionsStep implements CommitStep {
   private final String destTableIdStr;
   private final Properties properties;
-  private final byte[] serializedDataFiles;
+  // Data files are kept as a list of base64 encoded strings for optimised 
de-serialization.
+  private final List<String> base64EncodedDataFiles;
   private final String partitionColName;
   private final String partitionValue;
   public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
@@ -72,14 +74,14 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
    * Constructs an {@code IcebergReplacePartitionsStep} with the specified 
parameters.
    *
    * @param destTableIdStr the identifier of the destination table as a string
-   * @param serializedDataFiles [from List<DataFiles>] the serialized data 
files to be used for replacing partitions
+   * @param base64EncodedDataFiles [from List<DataFiles>] the serialized data 
files to be used for replacing partitions
    * @param properties the properties containing configuration
    */
-  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, byte[] serializedDataFiles, Properties 
properties) {
+  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, List<String> base64EncodedDataFiles, 
Properties properties) {
     this.destTableIdStr = destTableIdStr;
     this.partitionColName = partitionColName;
     this.partitionValue = partitionValue;
-    this.serializedDataFiles = serializedDataFiles;
+    this.base64EncodedDataFiles = base64EncodedDataFiles;
     this.properties = properties;
   }
 
@@ -101,7 +103,7 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
     // our copying. any new data written in the meanwhile to THE SAME 
partitions we are about to overwrite will be
     // clobbered and replaced by the copy entities from our execution.
     IcebergTable destTable = 
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
-    List<DataFile> dataFiles = 
SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
+    List<DataFile> dataFiles = getDataFiles();
     try {
       log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; 
numDataFiles: {}; path[0]: {}",
           this.destTableIdStr,
@@ -138,6 +140,14 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
     }
   }
 
+  private List<DataFile> getDataFiles() {
+    List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
+    for (String base64EncodedDataFile : base64EncodedDataFiles) {
+      
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
+    }
+    return dataFiles;
+  }
+
   protected IcebergCatalog createDestinationCatalog() throws IOException {
     return IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION);
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 42582f09e3..658f4a265d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -136,7 +137,7 @@ public class IcebergPartitionDataset extends IcebergDataset 
{
   private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> 
srcDataFiles)
       throws IcebergTable.TableNotFoundException {
     String fileSet = this.getFileSetId();
-    Map<Path, DataFile> destDataFileBySrcPath = Maps.newHashMap();
+    Map<Path, DataFile> destDataFileBySrcPath = new 
ConcurrentHashMap<>(srcDataFiles.size());
     if (srcDataFiles.isEmpty()) {
       log.warn("~{}~ found no data files for partition col : {} with partition 
value : {} to copy", fileSet,
           this.partitionColumnName, this.partitionColValue);
@@ -188,11 +189,11 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
   private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, 
DataFile> destDataFileBySrcPath)
       throws IOException {
     Function<Path, FileStatus> getFileStatus = 
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
-    Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+    Map<Path, FileStatus> srcFileStatusByDestFilePath = new 
ConcurrentHashMap<>();
     try {
       srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
-          .stream()
-          .collect(Collectors.toMap(entry -> new 
Path(entry.getValue().path().toString()),
+          .parallelStream()
+          .collect(Collectors.toConcurrentMap(entry -> new 
Path(entry.getValue().path().toString()),
               entry -> getFileStatus.apply(entry.getKey())));
     } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
       wrapper.rethrowWrapped();
@@ -201,7 +202,7 @@ public class IcebergPartitionDataset extends IcebergDataset 
{
   }
 
   private PostPublishStep createOverwritePostPublishStep(List<DataFile> 
destDataFiles) {
-    byte[] serializedDataFiles = 
SerializationUtil.serializeToBytes(destDataFiles);
+    List<String> serializedDataFiles = 
getBase64EncodedDataFiles(destDataFiles);
 
     IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
         this.getDestIcebergTable().getTableId().toString(),
@@ -214,6 +215,14 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
     return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), 
icebergOverwritePartitionStep, 0);
   }
 
+  private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles) 
{
+    List<String> base64EncodedDataFiles = new 
ArrayList<>(destDataFiles.size());
+    for (DataFile dataFile : destDataFiles) {
+      
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
+    }
+    return base64EncodedDataFiles;
+  }
+
   private Predicate<StructLike> createPartitionFilterPredicate() throws 
IOException {
     //TODO: Refactor it later using factory or other way to support different 
types of filter predicate
     // Also take into consideration creation of Expression Filter to be used 
in overwrite api
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index 6e273ca2d6..e6a626fca7 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
@@ -43,7 +44,7 @@ public class IcebergOverwritePartitionsStepTest {
   private IcebergTable mockIcebergTable;
   private IcebergCatalog mockIcebergCatalog;
   private Properties mockProperties;
-  private byte[] serializedDummyDataFiles;
+  private List<String> base64EncodedDataFiles;
   private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
 
   @BeforeMethod
@@ -52,16 +53,24 @@ public class IcebergOverwritePartitionsStepTest {
     mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
     mockProperties = new Properties();
 
-    List<DataFile> dummyDataFiles = createDummyDataFiles();
-    serializedDummyDataFiles = 
SerializationUtil.serializeToBytes(dummyDataFiles);
+    base64EncodedDataFiles = getEncodedDummyDataFiles();
 
     spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
-        testPartitionColName, testPartitionColValue, serializedDummyDataFiles, 
mockProperties));
+        testPartitionColName, testPartitionColValue, base64EncodedDataFiles, 
mockProperties));
 
     
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
     
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
   }
 
+  private List<String> getEncodedDummyDataFiles() {
+    List<DataFile> dummyDataFiles = createDummyDataFiles();
+    List<String> base64EncodedDataFiles = new 
ArrayList<>(dummyDataFiles.size());
+    for (DataFile dataFile : dummyDataFiles) {
+      
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
+    }
+    return base64EncodedDataFiles;
+  }
+
   @Test
   public void testNeverIsCompleted() {
     Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
@@ -116,7 +125,7 @@ public class IcebergOverwritePartitionsStepTest {
     
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
 + "." + RETRY_TIMES,
         Integer.toString(retryCount));
     spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
-        testPartitionColName, testPartitionColValue, serializedDummyDataFiles, 
mockProperties));
+        testPartitionColName, testPartitionColValue, base64EncodedDataFiles, 
mockProperties));
     
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
     
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
     try {

Reply via email to