Blazer-007 commented on code in PR #4140:
URL: https://github.com/apache/gobblin/pull/4140#discussion_r2324561765


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java:
##########
@@ -0,0 +1,43 @@
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.util.SerializationUtil;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyableFile;
+
+
+/**
+ * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg 
{@link DataFile}.
+ */
+@Getter
+@Setter
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+public class IcebergPartitionCopyableFile extends CopyableFile {
+
+  private String base64EncodedDataFile;
+
+  public String getBase64EncodedDataFile() {
+    return this.base64EncodedDataFile;
+  }

Review Comment:
   this can be removed



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -186,29 +190,30 @@ private Path addUUIDToPath(String filePathStr) {
     return new Path(fileDir, newFileName);
   }
 
-  private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, 
DataFile> destDataFileBySrcPath)
-      throws IOException {
+  private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(
+      Map<Path, DataFile> destDataFileBySrcPath,
+      Map<Path, Path> destPathToSrcPath) throws IOException {
     Function<Path, FileStatus> getFileStatus = 
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
-    Map<Path, FileStatus> srcFileStatusByDestFilePath = new 
ConcurrentHashMap<>();
+    final Map<Path, FileStatus> srcFileStatusByDestFilePath = new 
ConcurrentHashMap<>();
     try {
-      srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
+      destDataFileBySrcPath.entrySet()
           .parallelStream()
-          .collect(Collectors.toConcurrentMap(entry -> new 
Path(entry.getValue().path().toString()),
-              entry -> getFileStatus.apply(entry.getKey())));
+          .forEach(entry -> {
+                Path destPath = new Path(entry.getValue().path().toString());
+                destPathToSrcPath.put(destPath, entry.getKey());
+                srcFileStatusByDestFilePath.put(destPath, 
getFileStatus.apply(entry.getKey()));
+          });

Review Comment:
   nit : Let's move this logic to caller itself and see if we can parallelize 
that for-loop to reduce runtime



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java:
##########
@@ -351,12 +355,16 @@ private void 
publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
     // ensure every successful state is committed
     // WARNING: this MUST NOT run before the WU is actually executed--hence 
NOT YET for post-publish steps!
     // (that's because `WorkUnitState::getWorkingState()` returns 
`WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs 
yet to execute)
+    List<DataFile> icebergDataFiles = new ArrayList<>();
     for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
       if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
         wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
       }
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
-      if (copyEntity instanceof CopyableFile) {
+      if (copyEntity instanceof CopyableFile || copyEntity instanceof 
IcebergPartitionCopyableFile) {
+        if (copyEntity instanceof IcebergPartitionCopyableFile) {
+          icebergDataFiles.add(((IcebergPartitionCopyableFile) 
copyEntity).getDataFile());

Review Comment:
   Lets add a comment to describe why this is done



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java:
##########
@@ -0,0 +1,43 @@
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.util.SerializationUtil;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyableFile;
+
+
+/**
+ * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg 
{@link DataFile}.
+ */
+@Getter
+@Setter
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+public class IcebergPartitionCopyableFile extends CopyableFile {
+
+  private String base64EncodedDataFile;
+
+  public String getBase64EncodedDataFile() {
+    return this.base64EncodedDataFile;
+  }
+
+  public IcebergPartitionCopyableFile(CopyableFile copyableFile, DataFile 
dataFile) {
+    super(copyableFile.getOrigin(), copyableFile.getDestination(), 
copyableFile.getDestinationOwnerAndPermission(),
+        copyableFile.getAncestorsOwnerAndPermission(), 
copyableFile.getChecksum(), copyableFile.getPreserve(),
+        copyableFile.getFileSet(), copyableFile.getOriginTimestamp(), 
copyableFile.getUpstreamTimestamp(),
+        copyableFile.getAdditionalMetadata(), copyableFile.datasetOutputPath, 
copyableFile.getDataFileVersionStrategy());
+    this.base64EncodedDataFile = SerializationUtil.serializeToBase64(dataFile);
+  }
+
+  public DataFile getDataFile() {
+    return SerializationUtil.deserializeFromBase64(base64EncodedDataFile);
+  }

Review Comment:
   nit : lets add java doc here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to