szlta commented on code in PR #3225:
URL: https://github.com/apache/hive/pull/3225#discussion_r854074661
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java:
##########
@@ -53,7 +54,8 @@ public class HiveIcebergDeleteWriter extends
HiveIcebergWriter {
public void write(Writable row) throws IOException {
Record rec = ((Container<Record>) row).get();
PositionDelete<Record> positionDelete =
IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate);
- writer.write(positionDelete, spec, partition(positionDelete.row()));
+ Integer specId = rec.get(0, Integer.class);
Review Comment:
I guess this is meta column at position 0? Could you annotate with some
comments please?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -374,9 +373,12 @@ public DynamicPartitionCtx createDPContext(HiveConf
hiveConf, org.apache.hadoop.
fieldOrderMap.put(fields.get(i).name(), i);
}
+ // deletes already use the bucket values in the partition_struct for
sorting, so no need to add the sort expression
Review Comment:
Is this true for bucket transform only? Is bucket() special for some reason,
or could we avoid sorting according to other partition columns (and transform
types) as well?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java:
##########
@@ -101,7 +101,14 @@ public void close(boolean abort) throws IOException {
}
protected PartitionKey partition(Record row) {
- currentKey.partition(wrapper.wrap(row));
- return currentKey;
+ // get partition key for the latest spec
+ return partition(row, specs.size() - 1);
Review Comment:
Same question for getting the latest spec.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java:
##########
@@ -37,17 +38,17 @@
class HiveIcebergRecordWriter extends HiveIcebergWriter {
- HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
+ HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs,
FileFormat format,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory
fileFactory, FileIO io, long targetFileSize,
TaskAttemptID taskAttemptID, String tableName) {
- super(schema, spec, io, taskAttemptID, tableName,
+ super(schema, specs, io, taskAttemptID, tableName,
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format,
targetFileSize));
}
@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
- writer.write(record, spec, partition(record));
+ writer.write(record, specs.get(specs.size() - 1), partition(record));
Review Comment:
Are we trying to get the latest spec here? If so it could become problematic
if an older spec is reused to be the current one.
E.g: partition evolution goes as:
1. initially partitioned by col and col2 -> spec0: identity(col),
identity(col2); latest_spec=0
2. remove col from spec -> spec1: identity(col2); latest_spec=1
3. re-add col to spec -> no new spec is created; latest_spec=0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]