This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 60e8a274263 HIVE-26556: Iceberg: Properties set in HiveIcebergSerde
are not propagated to jobconf (#3617). (Ayush Saxena, reviewed by Adam Szita)
60e8a274263 is described below
commit 60e8a274263a1705f75fa3f068ca4eceefd98b18
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Oct 11 11:37:59 2022 +0530
HIVE-26556: Iceberg: Properties set in HiveIcebergSerde are not propagated
to jobconf (#3617). (Ayush Saxena, reviewed by Adam Szita)
---
.../org/apache/iceberg/mr/hive/HiveIcebergSerDe.java | 20 +++++++++++++++-----
.../iceberg/mr/hive/TestHiveIcebergSelects.java | 1 -
.../apache/hadoop/hive/ql/parse/CalcitePlanner.java | 7 ++++++-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 6 ++++--
.../java/org/apache/hadoop/hive/serde2/SerDe.java | 10 ++++++++++
5 files changed, 35 insertions(+), 9 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 4c9eeb8f8da..ed076cdc472 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -77,6 +77,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
private Collection<String> partitionColumns;
private Map<ObjectInspector, Deserializer> deserializers =
Maps.newHashMapWithExpectedSize(1);
private Container<Record> row = new Container<>();
+ private Map<String, String> jobConf = Maps.newHashMap();
@Override
public void initialize(@Nullable Configuration configuration, Properties
serDeProperties,
@@ -130,13 +131,14 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- this.projectedSchema = projectedSchema(configuration,
serDeProperties.getProperty(Catalogs.NAME), tableSchema);
+ this.projectedSchema =
+ projectedSchema(configuration,
serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);
// Currently ClusteredWriter is used which requires that records are
ordered by partition keys.
// Here we ensure that SortedDynPartitionOptimizer will kick in and do the
sorting.
// TODO: remove once we have both Fanout and ClusteredWriter available:
HIVE-25948
- HiveConf.setIntVar(configuration,
HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
- HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE,
"nonstrict");
+
jobConf.put(HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD.varname,
"1");
+ jobConf.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname,
"nonstrict");
try {
this.inspector = IcebergObjectInspector.create(projectedSchema);
@@ -145,7 +147,8 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
- private static Schema projectedSchema(Configuration configuration, String
tableName, Schema tableSchema) {
+ private static Schema projectedSchema(Configuration configuration, String
tableName, Schema tableSchema,
+ Map<String, String> jobConfs) {
Context.Operation operation =
HiveCustomStorageHandlerUtils.getWriteOperation(configuration, tableName);
if (operation != null) {
switch (operation) {
@@ -159,7 +162,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
throw new IllegalArgumentException("Unsupported operation " +
operation);
}
} else {
- configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
+ jobConfs.put(InputFormatConfig.CASE_SENSITIVE, "false");
String[] selectedColumns =
ColumnProjectionUtils.getReadColumnNames(configuration);
// When same table is joined multiple times, it is possible some
selected columns are duplicated,
// in this case wrong recordStructField position leads wrong value or
ArrayIndexOutOfBoundException
@@ -257,6 +260,13 @@ public class HiveIcebergSerDe extends AbstractSerDe {
return null;
}
+ @Override
+ public void handleJobLevelConfiguration(HiveConf conf) {
+ for (Map.Entry<String, String> confs : jobConf.entrySet()) {
+ conf.set(confs.getKey(), confs.getValue());
+ }
+ }
+
@Override
public Object deserialize(Writable writable) {
return ((Container<?>) writable).get();
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index 6ab6e3e4ffb..d46b02d1b2c 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -189,7 +189,6 @@ public class TestHiveIcebergSelects extends
HiveIcebergStorageHandlerWithEngineB
@Test
public void testScanTableCaseInsensitive() throws IOException {
- shell.setHiveSessionValue(InputFormatConfig.CASE_SENSITIVE, false);
testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA_WITH_UPPERCASE,
fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 056e1c04b97..0683586e86c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -314,6 +314,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline;
import org.apache.hadoop.hive.ql.util.DirectionUtils;
import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.Deserializer;
import
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -2901,8 +2902,12 @@ public class CalcitePlanner extends SemanticAnalyzer {
// Virtual Cols
// 3.1 Add Column info for non partion cols (Object Inspector fields)
- StructObjectInspector rowObjectInspector = (StructObjectInspector)
tabMetaData.getDeserializer()
+ final Deserializer deserializer = tabMetaData.getDeserializer();
+ StructObjectInspector rowObjectInspector = (StructObjectInspector)
deserializer
.getObjectInspector();
+
+ deserializer.handleJobLevelConfiguration(conf);
+
List<? extends StructField> fields =
rowObjectInspector.getAllStructFieldRefs();
ColumnInfo colInfo;
String colName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 70b1d5b4a1a..0f07fa4dbd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11568,8 +11568,10 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
}
}
// Obtain inspector for schema
- StructObjectInspector rowObjectInspector = (StructObjectInspector) tab
- .getDeserializer().getObjectInspector();
+ final Deserializer deserializer = tab.getDeserializer();
+ StructObjectInspector rowObjectInspector = (StructObjectInspector)
deserializer.getObjectInspector();
+
+ deserializer.handleJobLevelConfiguration(conf);
List<? extends StructField> fields = rowObjectInspector
.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDe.java
b/serde/src/java/org/apache/hadoop/hive/serde2/SerDe.java
index 3b710a9196c..cf947c0aa93 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDe.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.serde2;
+import org.apache.hadoop.hive.conf.HiveConf;
+
/**
* A Hive Serializer/Deserializer.
*/
@@ -29,4 +31,12 @@ public interface SerDe {
* @return {@link SerDeStats} object; or in case not supported: null
*/
SerDeStats getSerDeStats();
+
+ /**
+ * Adds SerDe specific configurations to job conf.
+ * @param conf the job conf.
+ */
+ default void handleJobLevelConfiguration(HiveConf conf) {
+ // Do nothing
+ }
}