This is an automated email from the ASF dual-hosted git repository.

sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aeaff2057a HIVE-27991: Utilise FanoutWriters when inserting records 
in an Iceberg table when the records are unsorted (#4988) (Sourabh Badhya 
reviewed by Butao Zhang)
1aeaff2057a is described below

commit 1aeaff2057a2f4c241f8bcc53a2a529e6e7f45d4
Author: Sourabh Badhya <[email protected]>
AuthorDate: Mon Jan 22 19:30:40 2024 +0530

    HIVE-27991: Utilise FanoutWriters when inserting records in an Iceberg 
table when the records are unsorted (#4988) (Sourabh Badhya reviewed by Butao 
Zhang)
---
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java      |  1 +
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java    |  1 +
 .../mr/hive/writer/HiveIcebergRecordWriter.java       | 14 +++++++++++---
 .../apache/iceberg/mr/hive/writer/WriterBuilder.java  |  9 ++++++++-
 .../positive/ctas_iceberg_partitioned_orc.q.out       |  2 +-
 .../apache/hadoop/hive/ql/exec/FileSinkOperator.java  |  3 +++
 .../hadoop/hive/ql/plan/DynamicPartitionCtx.java      | 10 ++++++++++
 .../authorization/HiveCustomStorageHandlerUtils.java  | 19 +++++++++++++++++++
 8 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index c356898c65e..508e3459269 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -77,6 +77,7 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
         .attemptID(taskAttemptID)
         .poolSize(poolSize)
         .operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc, 
tableName))
+        
.isFanoutEnabled(!HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc, 
tableName))
         .build();
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index d07b820e6d0..cd50aa929c0 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -702,6 +702,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
 
       addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, 
getSortTransformSpec(table));
     }
+    dpCtx.setHasCustomSortExprs(!customSortExprs.isEmpty());
 
     return dpCtx;
   }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 26c4e0947d0..b43376ec7f9 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -29,9 +29,11 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.ClusteredDataWriter;
 import org.apache.iceberg.io.DataWriteResult;
+import org.apache.iceberg.io.FanoutDataWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.mr.mapred.Container;
 
@@ -41,9 +43,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
 
   HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, 
int currentSpecId,
       FileWriterFactory<Record> fileWriterFactory, OutputFileFactory 
fileFactory, FileIO io,
-      long targetFileSize) {
-    super(schema, specs, io,
-        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, 
targetFileSize));
+      long targetFileSize, boolean fanoutEnabled) {
+    super(schema, specs, io, getIcebergDataWriter(fileWriterFactory, 
fileFactory, io, targetFileSize, fanoutEnabled));
     this.currentSpecId = currentSpecId;
   }
 
@@ -58,4 +59,11 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
     List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
     return FilesForCommit.onlyData(dataFiles);
   }
+
+  private static PartitioningWriter 
getIcebergDataWriter(FileWriterFactory<Record> fileWriterFactory,
+      OutputFileFactory fileFactory, FileIO io,
+      long targetFileSize, boolean fanoutEnabled) {
+    return fanoutEnabled ? new FanoutDataWriter<>(fileWriterFactory, 
fileFactory, io, targetFileSize)
+      : new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, 
targetFileSize);
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
index 2f7177b6381..c68bf8fb1d6 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -45,6 +45,8 @@ public class WriterBuilder {
   private String queryId;
   private int poolSize;
   private Operation operation;
+  private boolean fanoutEnabled;
+
   // A task may write multiple output files using multiple writers. Each of 
them must have a unique operationId.
   private static AtomicInteger operationNum = new AtomicInteger(0);
 
@@ -85,6 +87,11 @@ public class WriterBuilder {
     return this;
   }
 
+  public WriterBuilder isFanoutEnabled(boolean isFanoutEnabled) {
+    this.fanoutEnabled = isFanoutEnabled;
+    return this;
+  }
+
   public HiveIcebergWriter build() {
     Map<String, String> properties = table.properties();
 
@@ -133,7 +140,7 @@ public class WriterBuilder {
           break;
         case OTHER:
           writer = new HiveIcebergRecordWriter(dataSchema, specs, 
currentSpecId, writerFactory, outputFileFactory,
-            io, targetFileSize);
+            io, targetFileSize, fanoutEnabled);
           break;
         default:
           // Update and Merge should be splitted to inserts and deletes
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
index d0c2aef2bf1..f6a407b35be 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
@@ -333,5 +333,5 @@ POSTHOOK: query: select * from tbl_ice
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-1      one     3
 1      two     4
+1      one     3
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index d7903747ae9..e05e36e60be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
 import static 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
+import static 
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperationIsSorted;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -633,6 +634,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
 
       jc = new JobConf(hconf);
       setWriteOperation(jc, getConf().getTableInfo().getTableName(), 
getConf().getWriteOperation());
+      setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(),
+              dpCtx != null && dpCtx.hasCustomSortExprs());
 
       try {
         createHiveOutputFormat(jc);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 3497f3120cd..580b4499217 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -52,6 +52,7 @@ public class DynamicPartitionCtx implements Serializable {
   private String defaultPartName; // default partition name in case of null or 
empty value
   private int maxPartsPerNode;    // maximum dynamic partitions created per 
mapper/reducer
   private Pattern whiteListPattern;
+  private boolean hasCustomSortExprs = false;
   /**
    * Expressions describing a custom way of sorting the table before write. 
Expressions can reference simple
    * column descriptions or a tree of expressions containing more columns and 
UDFs.
@@ -147,6 +148,7 @@ public class DynamicPartitionCtx implements Serializable {
     this.maxPartsPerNode = dp.maxPartsPerNode;
     this.whiteListPattern = dp.whiteListPattern;
     this.customSortExpressions = dp.customSortExpressions;
+    this.hasCustomSortExprs = dp.customSortExpressions != null && 
!dp.customSortExpressions.isEmpty();
     this.customSortOrder = dp.customSortOrder;
     this.customSortNullOrder = dp.customSortNullOrder;
   }
@@ -258,4 +260,12 @@ public class DynamicPartitionCtx implements Serializable {
   public void setCustomSortNullOrder(List<Integer> customSortNullOrder) {
     this.customSortNullOrder = customSortNullOrder;
   }
+
+  public boolean hasCustomSortExprs() {
+    return hasCustomSortExprs;
+  }
+
+  public void setHasCustomSortExprs(boolean hasCustomSortExprs) {
+    this.hasCustomSortExprs = hasCustomSortExprs;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
index 8be4cfc5b8f..2f2f7d781c0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 
@@ -31,6 +32,7 @@ public class HiveCustomStorageHandlerUtils {
 
     public static final String WRITE_OPERATION_CONFIG_PREFIX = 
"file.sink.write.operation.";
 
+    public static final String WRITE_OPERATION_IS_SORTED = 
"file.sink.write.operation.sorted.";
 
     public static String getTablePropsForCustomStorageHandler(Map<String, 
String> tableProperties) {
         StringBuilder properties = new StringBuilder();
@@ -71,4 +73,21 @@ public class HiveCustomStorageHandlerUtils {
 
         conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
     }
+
+    public static void setWriteOperationIsSorted(Configuration conf, String 
tableName, boolean isSorted) {
+        if (conf == null || tableName == null) {
+            return;
+        }
+
+        conf.set(WRITE_OPERATION_IS_SORTED + tableName, 
Boolean.toString(isSorted));
+    }
+
+    public static boolean getWriteOperationIsSorted(Configuration conf, String 
tableName) {
+        if (conf == null || tableName == null) {
+            return false;
+        }
+
+        String isSortedString = conf.get(WRITE_OPERATION_IS_SORTED + 
tableName);
+        return Boolean.parseBoolean(isSortedString);
+    }
 }

Reply via email to