This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 877a0f8f1a Change data files from byte[] to list of base64 encoded
string (#4131)
877a0f8f1a is described below
commit 877a0f8f1a728ec70eb02f1a5e504ee62768f16c
Author: thisisArjit <[email protected]>
AuthorDate: Tue Aug 26 19:44:51 2025 +0530
Change data files from byte[] to list of base64 encoded string (#4131)
---
.../copy/iceberg/IcebergOverwritePartitionsStep.java | 20 +++++++++++++++-----
.../copy/iceberg/IcebergPartitionDataset.java | 19 ++++++++++++++-----
.../iceberg/IcebergOverwritePartitionsStepTest.java | 19 ++++++++++++++-----
3 files changed, 43 insertions(+), 15 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index dffcbccb27..13fb5d07b3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -58,7 +59,8 @@ import static
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
private final Properties properties;
- private final byte[] serializedDataFiles;
+ // Data files are kept as a list of base64 encoded strings for optimised
de-serialization.
+ private final List<String> base64EncodedDataFiles;
private final String partitionColName;
private final String partitionValue;
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
@@ -72,14 +74,14 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
* Constructs an {@code IcebergReplacePartitionsStep} with the specified
parameters.
*
* @param destTableIdStr the identifier of the destination table as a string
- * @param serializedDataFiles [from List<DataFiles>] the serialized data
files to be used for replacing partitions
+ * @param base64EncodedDataFiles [from List<DataFiles>] the serialized data
files to be used for replacing partitions
* @param properties the properties containing configuration
*/
- public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, byte[] serializedDataFiles, Properties
properties) {
+ public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, List<String> base64EncodedDataFiles,
Properties properties) {
this.destTableIdStr = destTableIdStr;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
- this.serializedDataFiles = serializedDataFiles;
+ this.base64EncodedDataFiles = base64EncodedDataFiles;
this.properties = properties;
}
@@ -101,7 +103,7 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
// our copying. any new data written in the meanwhile to THE SAME
partitions we are about to overwrite will be
// clobbered and replaced by the copy entities from our execution.
IcebergTable destTable =
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
- List<DataFile> dataFiles =
SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
+ List<DataFile> dataFiles = getDataFiles();
try {
log.info("~{}~ Starting partition overwrite - partition: {}; value: {};
numDataFiles: {}; path[0]: {}",
this.destTableIdStr,
@@ -138,6 +140,14 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
}
}
+ private List<DataFile> getDataFiles() {
+ List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
+ for (String base64EncodedDataFile : base64EncodedDataFiles) {
+
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
+ }
+ return dataFiles;
+ }
+
protected IcebergCatalog createDestinationCatalog() throws IOException {
return IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 42582f09e3..658f4a265d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -136,7 +137,7 @@ public class IcebergPartitionDataset extends IcebergDataset
{
private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile>
srcDataFiles)
throws IcebergTable.TableNotFoundException {
String fileSet = this.getFileSetId();
- Map<Path, DataFile> destDataFileBySrcPath = Maps.newHashMap();
+ Map<Path, DataFile> destDataFileBySrcPath = new
ConcurrentHashMap<>(srcDataFiles.size());
if (srcDataFiles.isEmpty()) {
log.warn("~{}~ found no data files for partition col : {} with partition
value : {} to copy", fileSet,
this.partitionColumnName, this.partitionColValue);
@@ -188,11 +189,11 @@ public class IcebergPartitionDataset extends
IcebergDataset {
private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path,
DataFile> destDataFileBySrcPath)
throws IOException {
Function<Path, FileStatus> getFileStatus =
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
- Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+ Map<Path, FileStatus> srcFileStatusByDestFilePath = new
ConcurrentHashMap<>();
try {
srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
- .stream()
- .collect(Collectors.toMap(entry -> new
Path(entry.getValue().path().toString()),
+ .parallelStream()
+ .collect(Collectors.toConcurrentMap(entry -> new
Path(entry.getValue().path().toString()),
entry -> getFileStatus.apply(entry.getKey())));
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
@@ -201,7 +202,7 @@ public class IcebergPartitionDataset extends IcebergDataset
{
}
private PostPublishStep createOverwritePostPublishStep(List<DataFile>
destDataFiles) {
- byte[] serializedDataFiles =
SerializationUtil.serializeToBytes(destDataFiles);
+ List<String> serializedDataFiles =
getBase64EncodedDataFiles(destDataFiles);
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
@@ -214,6 +215,14 @@ public class IcebergPartitionDataset extends
IcebergDataset {
return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(),
icebergOverwritePartitionStep, 0);
}
+ private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles)
{
+ List<String> base64EncodedDataFiles = new
ArrayList<>(destDataFiles.size());
+ for (DataFile dataFile : destDataFiles) {
+
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
+ }
+ return base64EncodedDataFiles;
+ }
+
private Predicate<StructLike> createPartitionFilterPredicate() throws
IOException {
//TODO: Refactor it later using factory or other way to support different
types of filter predicate
// Also take into consideration creation of Expression Filter to be used
in overwrite api
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index 6e273ca2d6..e6a626fca7 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -43,7 +44,7 @@ public class IcebergOverwritePartitionsStepTest {
private IcebergTable mockIcebergTable;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
- private byte[] serializedDummyDataFiles;
+ private List<String> base64EncodedDataFiles;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@BeforeMethod
@@ -52,16 +53,24 @@ public class IcebergOverwritePartitionsStepTest {
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();
- List<DataFile> dummyDataFiles = createDummyDataFiles();
- serializedDummyDataFiles =
SerializationUtil.serializeToBytes(dummyDataFiles);
+ base64EncodedDataFiles = getEncodedDummyDataFiles();
spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
- testPartitionColName, testPartitionColValue, serializedDummyDataFiles,
mockProperties));
+ testPartitionColName, testPartitionColValue, base64EncodedDataFiles,
mockProperties));
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
}
+ private List<String> getEncodedDummyDataFiles() {
+ List<DataFile> dummyDataFiles = createDummyDataFiles();
+ List<String> base64EncodedDataFiles = new
ArrayList<>(dummyDataFiles.size());
+ for (DataFile dataFile : dummyDataFiles) {
+
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
+ }
+ return base64EncodedDataFiles;
+ }
+
@Test
public void testNeverIsCompleted() {
Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
@@ -116,7 +125,7 @@ public class IcebergOverwritePartitionsStepTest {
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
+ "." + RETRY_TIMES,
Integer.toString(retryCount));
spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
- testPartitionColName, testPartitionColValue, serializedDummyDataFiles,
mockProperties));
+ testPartitionColName, testPartitionColValue, base64EncodedDataFiles,
mockProperties));
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
try {