This is an automated email from the ASF dual-hosted git repository.
sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1aeaff2057a HIVE-27991: Utilise FanoutWriters when inserting records
in an Iceberg table when the records are unsorted (#4988) (Sourabh Badhya
reviewed by Butao Zhang)
1aeaff2057a is described below
commit 1aeaff2057a2f4c241f8bcc53a2a529e6e7f45d4
Author: Sourabh Badhya <[email protected]>
AuthorDate: Mon Jan 22 19:30:40 2024 +0530
HIVE-27991: Utilise FanoutWriters when inserting records in an Iceberg
table when the records are unsorted (#4988) (Sourabh Badhya reviewed by Butao
Zhang)
---
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 1 +
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 1 +
.../mr/hive/writer/HiveIcebergRecordWriter.java | 14 +++++++++++---
.../apache/iceberg/mr/hive/writer/WriterBuilder.java | 9 ++++++++-
.../positive/ctas_iceberg_partitioned_orc.q.out | 2 +-
.../apache/hadoop/hive/ql/exec/FileSinkOperator.java | 3 +++
.../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 10 ++++++++++
.../authorization/HiveCustomStorageHandlerUtils.java | 19 +++++++++++++++++++
8 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index c356898c65e..508e3459269 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -77,6 +77,7 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
.attemptID(taskAttemptID)
.poolSize(poolSize)
.operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc,
tableName))
+
.isFanoutEnabled(!HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc,
tableName))
.build();
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index d07b820e6d0..cd50aa929c0 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -702,6 +702,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs,
getSortTransformSpec(table));
}
+ dpCtx.setHasCustomSortExprs(!customSortExprs.isEmpty());
return dpCtx;
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 26c4e0947d0..b43376ec7f9 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -29,9 +29,11 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.DataWriteResult;
+import org.apache.iceberg.io.FanoutDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.mapred.Container;
@@ -41,9 +43,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs,
int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory
fileFactory, FileIO io,
- long targetFileSize) {
- super(schema, specs, io,
- new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io,
targetFileSize));
+ long targetFileSize, boolean fanoutEnabled) {
+ super(schema, specs, io, getIcebergDataWriter(fileWriterFactory,
fileFactory, io, targetFileSize, fanoutEnabled));
this.currentSpecId = currentSpecId;
}
@@ -58,4 +59,11 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
return FilesForCommit.onlyData(dataFiles);
}
+
+ private static PartitioningWriter
getIcebergDataWriter(FileWriterFactory<Record> fileWriterFactory,
+ OutputFileFactory fileFactory, FileIO io,
+ long targetFileSize, boolean fanoutEnabled) {
+ return fanoutEnabled ? new FanoutDataWriter<>(fileWriterFactory,
fileFactory, io, targetFileSize)
+ : new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io,
targetFileSize);
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
index 2f7177b6381..c68bf8fb1d6 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -45,6 +45,8 @@ public class WriterBuilder {
private String queryId;
private int poolSize;
private Operation operation;
+ private boolean fanoutEnabled;
+
// A task may write multiple output files using multiple writers. Each of
them must have a unique operationId.
private static AtomicInteger operationNum = new AtomicInteger(0);
@@ -85,6 +87,11 @@ public class WriterBuilder {
return this;
}
+ public WriterBuilder isFanoutEnabled(boolean isFanoutEnabled) {
+ this.fanoutEnabled = isFanoutEnabled;
+ return this;
+ }
+
public HiveIcebergWriter build() {
Map<String, String> properties = table.properties();
@@ -133,7 +140,7 @@ public class WriterBuilder {
break;
case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs,
currentSpecId, writerFactory, outputFileFactory,
- io, targetFileSize);
+ io, targetFileSize, fanoutEnabled);
break;
default:
// Update and Merge should be splitted to inserts and deletes
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
index d0c2aef2bf1..f6a407b35be 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
@@ -333,5 +333,5 @@ POSTHOOK: query: select * from tbl_ice
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: hdfs://### HDFS PATH ###
-1 one 3
1 two 4
+1 one 3
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index d7903747ae9..e05e36e60be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
import static
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
+import static
org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperationIsSorted;
import java.io.IOException;
import java.io.Serializable;
@@ -633,6 +634,8 @@ public class FileSinkOperator extends
TerminalOperator<FileSinkDesc> implements
jc = new JobConf(hconf);
setWriteOperation(jc, getConf().getTableInfo().getTableName(),
getConf().getWriteOperation());
+ setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(),
+ dpCtx != null && dpCtx.hasCustomSortExprs());
try {
createHiveOutputFormat(jc);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 3497f3120cd..580b4499217 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -52,6 +52,7 @@ public class DynamicPartitionCtx implements Serializable {
private String defaultPartName; // default partition name in case of null or
empty value
private int maxPartsPerNode; // maximum dynamic partitions created per
mapper/reducer
private Pattern whiteListPattern;
+ private boolean hasCustomSortExprs = false;
/**
* Expressions describing a custom way of sorting the table before write.
Expressions can reference simple
* column descriptions or a tree of expressions containing more columns and
UDFs.
@@ -147,6 +148,7 @@ public class DynamicPartitionCtx implements Serializable {
this.maxPartsPerNode = dp.maxPartsPerNode;
this.whiteListPattern = dp.whiteListPattern;
this.customSortExpressions = dp.customSortExpressions;
+ this.hasCustomSortExprs = dp.customSortExpressions != null &&
!dp.customSortExpressions.isEmpty();
this.customSortOrder = dp.customSortOrder;
this.customSortNullOrder = dp.customSortNullOrder;
}
@@ -258,4 +260,12 @@ public class DynamicPartitionCtx implements Serializable {
public void setCustomSortNullOrder(List<Integer> customSortNullOrder) {
this.customSortNullOrder = customSortNullOrder;
}
+
+ public boolean hasCustomSortExprs() {
+ return hasCustomSortExprs;
+ }
+
+ public void setHasCustomSortExprs(boolean hasCustomSortExprs) {
+ this.hasCustomSortExprs = hasCustomSortExprs;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
index 8be4cfc5b8f..2f2f7d781c0 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -31,6 +32,7 @@ public class HiveCustomStorageHandlerUtils {
public static final String WRITE_OPERATION_CONFIG_PREFIX =
"file.sink.write.operation.";
+ public static final String WRITE_OPERATION_IS_SORTED =
"file.sink.write.operation.sorted.";
public static String getTablePropsForCustomStorageHandler(Map<String,
String> tableProperties) {
StringBuilder properties = new StringBuilder();
@@ -71,4 +73,21 @@ public class HiveCustomStorageHandlerUtils {
conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
}
+
+ public static void setWriteOperationIsSorted(Configuration conf, String
tableName, boolean isSorted) {
+ if (conf == null || tableName == null) {
+ return;
+ }
+
+ conf.set(WRITE_OPERATION_IS_SORTED + tableName,
Boolean.toString(isSorted));
+ }
+
+ public static boolean getWriteOperationIsSorted(Configuration conf, String
tableName) {
+ if (conf == null || tableName == null) {
+ return false;
+ }
+
+ String isSortedString = conf.get(WRITE_OPERATION_IS_SORTED +
tableName);
+ return Boolean.parseBoolean(isSortedString);
+ }
}