Blazer-007 commented on code in PR #4140: URL: https://github.com/apache/gobblin/pull/4140#discussion_r2324561765
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java: ########## @@ -0,0 +1,43 @@ +package org.apache.gobblin.data.management.copy.iceberg; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.util.SerializationUtil; + +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyableFile; + + +/** + * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg {@link DataFile}. + */ +@Getter +@Setter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@EqualsAndHashCode(callSuper = true) +@Slf4j +public class IcebergPartitionCopyableFile extends CopyableFile { + + private String base64EncodedDataFile; + + public String getBase64EncodedDataFile() { + return this.base64EncodedDataFile; + } Review Comment: this can be removed ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -186,29 +190,30 @@ private Path addUUIDToPath(String filePathStr) { return new Path(fileDir, newFileName); } - private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath) - throws IOException { + private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath( + Map<Path, DataFile> destDataFileBySrcPath, + Map<Path, Path> destPathToSrcPath) throws IOException { Function<Path, FileStatus> getFileStatus = CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus); - Map<Path, FileStatus> srcFileStatusByDestFilePath = new ConcurrentHashMap<>(); + final Map<Path, FileStatus> srcFileStatusByDestFilePath = new ConcurrentHashMap<>(); try { - srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet() + destDataFileBySrcPath.entrySet() .parallelStream() - .collect(Collectors.toConcurrentMap(entry -> new Path(entry.getValue().path().toString()), - entry -> getFileStatus.apply(entry.getKey()))); + .forEach(entry -> { + Path destPath = new Path(entry.getValue().path().toString()); + destPathToSrcPath.put(destPath, entry.getKey()); + srcFileStatusByDestFilePath.put(destPath, getFileStatus.apply(entry.getKey())); + }); Review Comment: nit : Let's move this logic to caller itself and see if we can parallelize that for-loop to reduce runtime ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java: ########## @@ -351,12 +355,16 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, // ensure every successful state is committed // WARNING: this MUST NOT run before the WU is actually executed--hence NOT YET for post-publish steps! // (that's because `WorkUnitState::getWorkingState()` returns `WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs yet to execute) + List<DataFile> icebergDataFiles = new ArrayList<>(); for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) { if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); } CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus); - if (copyEntity instanceof CopyableFile) { + if (copyEntity instanceof CopyableFile || copyEntity instanceof IcebergPartitionCopyableFile) { + if (copyEntity instanceof IcebergPartitionCopyableFile) { + icebergDataFiles.add(((IcebergPartitionCopyableFile) copyEntity).getDataFile()); Review Comment: Lets add a comment to describe why this is done ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java: ########## @@ -0,0 +1,43 @@ +package org.apache.gobblin.data.management.copy.iceberg; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.util.SerializationUtil; + +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyableFile; + + +/** + * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg {@link DataFile}. + */ +@Getter +@Setter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@EqualsAndHashCode(callSuper = true) +@Slf4j +public class IcebergPartitionCopyableFile extends CopyableFile { + + private String base64EncodedDataFile; + + public String getBase64EncodedDataFile() { + return this.base64EncodedDataFile; + } + + public IcebergPartitionCopyableFile(CopyableFile copyableFile, DataFile dataFile) { + super(copyableFile.getOrigin(), copyableFile.getDestination(), copyableFile.getDestinationOwnerAndPermission(), + copyableFile.getAncestorsOwnerAndPermission(), copyableFile.getChecksum(), copyableFile.getPreserve(), + copyableFile.getFileSet(), copyableFile.getOriginTimestamp(), copyableFile.getUpstreamTimestamp(), + copyableFile.getAdditionalMetadata(), copyableFile.datasetOutputPath, copyableFile.getDataFileVersionStrategy()); + this.base64EncodedDataFile = SerializationUtil.serializeToBase64(dataFile); + } + + public DataFile getDataFile() { + return SerializationUtil.deserializeFromBase64(base64EncodedDataFile); + } Review Comment: nit : lets add java doc here too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org