SourabhBadhya commented on code in PR #5961:
URL: https://github.com/apache/hive/pull/5961#discussion_r2217639549


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -1641,6 +1645,12 @@ static void overlayTableProperties(Configuration 
configuration, TableDesc tableD
 
       // serialize table object into config
       Table serializableTable = SerializableTable.copyOf(table);
+      // set table format-version and write-mode information from tableDesc
+      List<String> writeConfigList = ImmutableList.of(
+          FORMAT_VERSION, DELETE_MODE, UPDATE_MODE, MERGE_MODE);
+      if (IcebergTableUtil.isV2Table(props::getProperty)) {

Review Comment:
   Is it necessary to create this function - Cant we simply pass as map which 
is already a defined util function instead of passing a binary operator. Even 
if we want to use a BinaryOperator, can we remove the isV2Table() which takes 
in a map? Or is this publicly defined function which can't be removed?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -392,27 +396,47 @@ public static void cherryPick(Table table, long 
snapshotId) {
   }
 
   public static boolean isV2Table(Map<String, String> props) {

Review Comment:
   Maybe you can rename the function as `isAtleastV2Table()` or something on 
these lines.



##########
iceberg/iceberg-handler/src/test/queries/positive/iceberg_v3_deletion_vectors.q:
##########
@@ -0,0 +1,75 @@
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s\'current-snapshot-id\'=\')(\d+)(\')/$1#Masked#$3/
+-- Mask added file size
+--! qt:replace:/(\S+\"added-files-size\":\")(\d+)(\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S+\"total-files-size\":\")(\d+)(\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s\'current-snapshot-timestamp-ms\'=\')(\d+)(\')/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/("iceberg-version":")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))/$1#Masked#/
+
+-- create an unpartitioned table
+ create table ice01 (id int) Stored by Iceberg stored as ORC
+ TBLPROPERTIES('format-version'='3');
+
+-- check the property value
+show create table ice01;
+
+-- insert some values
+insert into ice01 values (1),(2),(3),(4);
+
+-- check the inserted values
+select * from ice01;
+
+-- delete some values
+delete from ice01 where id>2;
+
+-- check the values, the delete value should be there
+select * from ice01 order by id;
+
+-- insert some more data
+ insert into ice01 values (5),(6),(7),(8);
+
+-- check the values, only the delete value shouldn't be there
+select * from ice01 order by id;
+
+-- delete one value
+delete from ice01 where id=7;
+
+-- check the entries, the deleted entries shouldn't be there.
+select * from ice01 order by id;

Review Comment:
   Can we also display content offset for each of this delete tests using 
Position delete metadata tables, so that we are sure that deletion vectors are 
written in action.
   
   Reference on deletion vectors metadata columns:
   
https://github.com/apache/iceberg/commit/026a9b00459ae458eb4c6c620c78742dc43a00e8



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java:
##########
@@ -120,27 +132,69 @@ public HiveIcebergWriter build() {
 
     HiveIcebergWriter writer;
     boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, 
table.properties()::getOrDefault);
+
     if (isCOW) {
       writer = new HiveIcebergCopyOnWriteRecordWriter(table, writerFactory, 
dataFileFactory, context);
     } else {
-      switch (operation) {
-        case DELETE:
-          writer = new HiveIcebergDeleteWriter(table, writerFactory, 
deleteFileFactory, context);
-          break;
-        case OTHER:
-          writer = new HiveIcebergRecordWriter(table, writerFactory, 
dataFileFactory, context);
-          break;
-        default:
-          // Update and Merge should be splitted to inserts and deletes
-          throw new IllegalArgumentException("Unsupported operation when 
creating IcebergRecordWriter: " +
-            operation.name());
-      }
+      writer = switch (operation) {
+        case DELETE ->
+            new HiveIcebergDeleteWriter(table, rewritableDeletes, 
writerFactory, deleteFileFactory, context);
+        case OTHER ->
+            new HiveIcebergRecordWriter(table, writerFactory, dataFileFactory, 
context);
+        default ->
+            // Update and Merge should be splitted to inserts and deletes
+            throw new IllegalArgumentException("Unsupported operation when 
creating IcebergRecordWriter: " +
+                operation.name());
+      };
     }
 
     WriterRegistry.registerWriter(attemptID, tableName, writer);
     return writer;
   }
 
+  private Map<String, DeleteFileSet> rewritableDeletes() {
+    TableScan scan = table.newScan().caseSensitive(false).ignoreResiduals();

Review Comment:
   Is it right that we are ignoring residual filters? Is the idea here to just 
use all the rows applicable for deletion?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -392,27 +396,47 @@ public static void cherryPick(Table table, long 
snapshotId) {
   }
 
   public static boolean isV2Table(Map<String, String> props) {
-    return props == null ||
-        "2".equals(props.get(TableProperties.FORMAT_VERSION)) || 
props.get(TableProperties.FORMAT_VERSION) == null;
+    return IcebergTableUtil.formatVersion(props) >= 2;
   }
 
-  public static boolean isCopyOnWriteMode(Context.Operation operation, 
BinaryOperator<String> props) {
-    String mode = null;
-    switch (operation) {
-      case DELETE:
-        mode = props.apply(TableProperties.DELETE_MODE,
-            TableProperties.DELETE_MODE_DEFAULT);
-        break;
-      case UPDATE:
-        mode = props.apply(TableProperties.UPDATE_MODE,
-            TableProperties.UPDATE_MODE_DEFAULT);
-        break;
-      case MERGE:
-        mode = props.apply(TableProperties.MERGE_MODE,
-            TableProperties.MERGE_MODE_DEFAULT);
-        break;
+  public static boolean isV2Table(BinaryOperator<String> props) {
+    return IcebergTableUtil.formatVersion(props) >= 2;
+  }
+
+  public static Integer formatVersion(Map<String, String> props) {
+    if (props == null) {
+      return 2; // default to V2

Review Comment:
   nit: Maybe you can add a TODO stating that we have to move the default to 3 
whenever applicable?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -392,27 +396,47 @@ public static void cherryPick(Table table, long 
snapshotId) {
   }
 
   public static boolean isV2Table(Map<String, String> props) {
-    return props == null ||
-        "2".equals(props.get(TableProperties.FORMAT_VERSION)) || 
props.get(TableProperties.FORMAT_VERSION) == null;
+    return IcebergTableUtil.formatVersion(props) >= 2;
   }
 
-  public static boolean isCopyOnWriteMode(Context.Operation operation, 
BinaryOperator<String> props) {
-    String mode = null;
-    switch (operation) {
-      case DELETE:
-        mode = props.apply(TableProperties.DELETE_MODE,
-            TableProperties.DELETE_MODE_DEFAULT);
-        break;
-      case UPDATE:
-        mode = props.apply(TableProperties.UPDATE_MODE,
-            TableProperties.UPDATE_MODE_DEFAULT);
-        break;
-      case MERGE:
-        mode = props.apply(TableProperties.MERGE_MODE,
-            TableProperties.MERGE_MODE_DEFAULT);
-        break;
+  public static boolean isV2Table(BinaryOperator<String> props) {
+    return IcebergTableUtil.formatVersion(props) >= 2;
+  }
+
+  public static Integer formatVersion(Map<String, String> props) {
+    if (props == null) {
+      return 2; // default to V2
     }
-    return 
RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(mode);
+    return IcebergTableUtil.formatVersion(props::getOrDefault);
+  }
+
+  private static Integer formatVersion(BinaryOperator<String> props) {

Review Comment:
   Same argument here - should we really create another function which takes in 
a BinaryOperator?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java:
##########
@@ -108,19 +115,53 @@ static PartitioningWriter<Record, DataWriteResult> 
newDataWriter(
   // use a fanout writer if the input is unordered no matter whether fanout 
writers are enabled
   // clustered writers assume that the position deletes are already ordered by 
file and position
   static PartitioningWriter<PositionDelete<Record>, DeleteWriteResult> 
newDeleteWriter(
-      Table table, HiveFileWriterFactory writers, OutputFileFactory files, 
Context context) {
+      Table table, Map<String, DeleteFileSet> rewritableDeletes, 
HiveFileWriterFactory writers,
+      OutputFileFactory files, Context context) {
 
+    Function<CharSequence, PositionDeleteIndex> previousDeleteLoader =
+        PreviousDeleteLoader.create(table, rewritableDeletes);
     FileIO io = table.io();
     boolean inputOrdered = context.inputOrdered();
     long targetFileSize = context.targetDeleteFileSize();
     DeleteGranularity deleteGranularity = context.deleteGranularity();
 
-    if (inputOrdered) {
+    if (context.useDVs()) {
+      return new PartitioningDVWriter<>(files, previousDeleteLoader);
+    } else if (inputOrdered && rewritableDeletes == null) {

Review Comment:
   Is it because ClusteredDeleteWriter doesn't support passing deletion vectors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to