This is an automated email from the ASF dual-hosted git repository.
mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e8effb5 HIVE-25849: Disable insert overwrite for bucket partitioned
Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
e8effb5 is described below
commit e8effb585db81edd809d1cb783ce22b079caa264
Author: Marton Bod <[email protected]>
AuthorDate: Fri Jan 7 16:34:04 2022 +0100
HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables
(Marton Bod, reviewed by Adam Szita and Peter Vary)
---
.../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 13 +++++++++++++
.../java/org/apache/iceberg/mr/hive/IcebergTableUtil.java | 4 ++++
.../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 15 +++++++++++++++
.../hadoop/hive/ql/metadata/HiveStorageHandler.java | 11 +++++++++++
.../org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 ++++++++++++
5 files changed, 55 insertions(+)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 60f9d42..676cb6a 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -48,11 +48,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -369,6 +371,17 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
return new URI(ICEBERG_URI_PREFIX + table.location());
}
+ @Override
+ public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException
{
+ HiveStorageHandler.super.validateSinkDesc(sinkDesc);
+ if (sinkDesc.getInsertOverwrite()) {
+ Table table = IcebergTableUtil.getTable(conf,
sinkDesc.getTableInfo().getProperties());
+ if (IcebergTableUtil.isBucketed(table)) {
+ throw new SemanticException("Cannot perform insert overwrite query on
bucket partitioned Iceberg table.");
+ }
+ }
+ }
+
private void setCommonJobConf(JobConf jobConf) {
jobConf.set("tez.mrreader.config.update.properties",
"hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index d201099..9a1f316 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -194,4 +194,8 @@ public class IcebergTableUtil {
updatePartitionSpec.commit();
}
+
+ public static boolean isBucketed(Table table) {
+ return table.spec().fields().stream().anyMatch(f ->
f.transform().toString().startsWith("bucket["));
+ }
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 5222361..9b8def1 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -169,6 +170,20 @@ public class TestHiveIcebergInserts extends
HiveIcebergStorageHandlerWithEngineB
HiveIcebergTestUtils.validateData(table, expected, 0);
}
+ @Test
+ public void testInsertOverwriteBucketPartitionedTableThrowsError() {
+ TableIdentifier target = TableIdentifier.of("default", "target");
+ PartitionSpec spec =
PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .bucket("last_name", 16).identity("customer_id").build();
+ testTables.createTable(shell, target.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat, ImmutableList.of());
+
+ AssertHelpers.assertThrows("IOW should not work on bucket partitioned
table", IllegalArgumentException.class,
+ "Cannot perform insert overwrite query on bucket partitioned Iceberg
table",
+ () -> shell.executeStatement(
+
testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
target, true)));
+ }
+
/**
* Testing map-reduce inserts.
* @throws IOException If there is an underlying IOException
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b2f5a1a..0e38574 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -352,4 +354,13 @@ public interface HiveStorageHandler extends Configurable {
return new URI(this.getClass().getSimpleName().toLowerCase() + "://" +
HiveCustomStorageHandlerUtils.getTablePropsForCustomStorageHandler(tableProperties));
}
+
+ /**
+ * Validates whether the sink operation is permitted for the specific
storage handler, based
+ * on information contained in the sinkDesc.
+ * @param sinkDesc The sink descriptor
+ * @throws SemanticException if the sink operation is not allowed
+ */
+ default void validateSinkDesc(FileSinkDesc sinkDesc) throws
SemanticException {
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bbf1258..d1d5ee3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -12587,6 +12587,18 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
}
}
+ // validate if this sink operation is allowed for non-native tables
+ if (sinkOp instanceof FileSinkOperator) {
+ FileSinkOperator fileSinkOperator = (FileSinkOperator) sinkOp;
+ Optional<HiveStorageHandler> handler =
Optional.ofNullable(fileSinkOperator)
+ .map(FileSinkOperator::getConf)
+ .map(FileSinkDesc::getTable)
+ .map(Table::getStorageHandler);
+ if (handler.isPresent()) {
+ handler.get().validateSinkDesc(fileSinkOperator.getConf());
+ }
+ }
+
// Check query results cache
// In the case that row or column masking/filtering was required, we do
not support caching.
// TODO: Enable caching for queries with masking/filtering