szlta commented on code in PR #3225:
URL: https://github.com/apache/hive/pull/3225#discussion_r854074661


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java:
##########
@@ -53,7 +54,8 @@ public class HiveIcebergDeleteWriter extends 
HiveIcebergWriter {
   public void write(Writable row) throws IOException {
     Record rec = ((Container<Record>) row).get();
     PositionDelete<Record> positionDelete = 
IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate);
-    writer.write(positionDelete, spec, partition(positionDelete.row()));
+    Integer specId = rec.get(0, Integer.class);

Review Comment:
   I guess this is meta column at position 0? Could you annotate with some 
comments please?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -374,9 +373,12 @@ public DynamicPartitionCtx createDPContext(HiveConf 
hiveConf, org.apache.hadoop.
       fieldOrderMap.put(fields.get(i).name(), i);
     }
 
+    // deletes already use the bucket values in the partition_struct for 
sorting, so no need to add the sort expression

Review Comment:
   Is this true for bucket transform only? Is bucket() special for some reason, 
or could we avoid sorting according to other partition columns (and transform 
types) as well?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java:
##########
@@ -101,7 +101,14 @@ public void close(boolean abort) throws IOException {
   }
 
   protected PartitionKey partition(Record row) {
-    currentKey.partition(wrapper.wrap(row));
-    return currentKey;
+    // get partition key for the latest spec
+    return partition(row, specs.size() - 1);

Review Comment:
   Same question for getting the latest spec.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java:
##########
@@ -37,17 +38,17 @@
 
 class HiveIcebergRecordWriter extends HiveIcebergWriter {
 
-  HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
+  HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, 
FileFormat format,
       FileWriterFactory<Record> fileWriterFactory, OutputFileFactory 
fileFactory, FileIO io, long targetFileSize,
       TaskAttemptID taskAttemptID, String tableName) {
-    super(schema, spec, io, taskAttemptID, tableName,
+    super(schema, specs, io, taskAttemptID, tableName,
         new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, 
targetFileSize));
   }
 
   @Override
   public void write(Writable row) throws IOException {
     Record record = ((Container<Record>) row).get();
-    writer.write(record, spec, partition(record));
+    writer.write(record, specs.get(specs.size() - 1), partition(record));

Review Comment:
   Are we trying to get the latest spec here? If so it could become problematic 
if an older spec is reused to be the current one.
   E.g: partition evolution goes as:
   
   1. initially partitioned by col and col2 -> spec0: identity(col), 
identity(col2); latest_spec=0
   2. remove col from spec -> spec1: identity(col2); latest_spec=1
   3. re-add col to spec -> no new spec is created; latest_spec=0
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to