Blazer-007 commented on code in PR #4142:
URL: https://github.com/apache/gobblin/pull/4142#discussion_r2436688045


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -304,6 +306,13 @@ protected void overwritePartition(List<DataFile> 
dataFiles, String partitionColN
     } else {
       log.warn("~{}~ No current snapshot found before overwrite", tableId);
     }
+
+    updateSchema(updatedSchema, false);
+    overwritePartition(dataFiles, partitionColName, partitionValue);

Review Comment:
   Should this be part of a single table transaction commit  ?
   Is there any issue with using of that ?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -192,14 +192,18 @@ private Path addUUIDToPath(String filePathStr) {
   }
 
   private PostPublishStep createOverwritePostPublishStep() {
-    IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
-        this.getDestIcebergTable().getTableId().toString(),
-        this.partitionColumnName,
-        this.partitionColValue,
-        this.properties
-    );
-
-    return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), 
icebergOverwritePartitionStep, 0);
+    try {
+      IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
+          this.getDestIcebergTable().getTableId().toString(),
+          this.getSrcIcebergTable().accessTableMetadata().schema(),
+          this.partitionColumnName,

Review Comment:
   Fetching schema after generating files may lead to corruption in very high 
concurrency scenario, we should get schema when we get manifest files similar 
to what done in full table replication.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -292,7 +294,7 @@ public List<DataFile> 
getPartitionSpecificDataFiles(Predicate<StructLike> iceber
    * @param partitionColName the partition column name whose data files are to 
be replaced
    * @param partitionValue  the partition column value on which data files 
will be replaced
    */
-  protected void overwritePartition(List<DataFile> dataFiles, String 
partitionColName, String partitionValue)
+  protected void updateSchemaAndPartition(List<DataFile> dataFiles, Schema 
updatedSchema, String partitionColName, String partitionValue)

Review Comment:
   Update Java doc here as well
   Also name of function maybe changed to `updateSchemaAndOverwritePartition` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to