This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 15cd0522d71 [HUDI-5308] Hive3 query returns null when the where clause
has a partition field (#7355)
15cd0522d71 is described below
commit 15cd0522d7139b8dfa84c9f45040287ce409476a
Author: Manu <[email protected]>
AuthorDate: Wed May 10 16:43:35 2023 +0800
[HUDI-5308] Hive3 query returns null when the where clause has a partition
field (#7355)
* Partition query in hive3 returns null for Hive 3.x.
---
.../hudi/hadoop/HoodieParquetInputFormat.java | 5 +++-
.../realtime/HoodieHFileRealtimeInputFormat.java | 2 +-
.../realtime/HoodieParquetRealtimeInputFormat.java | 22 +++++++++++++----
.../utils/HoodieRealtimeInputFormatUtils.java | 28 +++++++++++-----------
.../utils/HoodieRealtimeRecordReaderUtils.java | 4 +---
.../org/apache/hudi/integ/ITTestHoodieSanity.java | 19 +++++++++++++++
6 files changed, 57 insertions(+), 23 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index dcb4b513fe5..c3588ef4e30 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -21,7 +21,7 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
-
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.io.ArrayWritable;
@@ -31,6 +31,8 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +86,7 @@ public class HoodieParquetInputFormat extends
HoodieParquetInputFormatBase {
LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
}
+ HoodieRealtimeInputFormatUtils.addProjectionField(job,
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
return getRecordReaderInternal(split, job, reporter);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
index 10a7864dfbd..c7655abbbf3 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
@@ -73,7 +73,7 @@ public class HoodieHFileRealtimeInputFormat extends
HoodieMergeOnReadTableInputF
// For e:g _hoodie_record_key would be missing and merge step would
throw exceptions.
// TO fix this, hoodie columns are appended late at the time
record-reader gets built instead of construction
// time.
- HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf,
Option.empty(), Option.empty());
+ HoodieRealtimeInputFormatUtils.addVirtualKeysProjection(jobConf,
Option.empty());
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP,
"true");
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index d474ee3cef6..c3d2c0d63b5 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -41,6 +41,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
/**
* Input Format, that provides a real-time view of data in a Hoodie table.
@@ -69,7 +72,7 @@ public class HoodieParquetRealtimeInputFormat extends
HoodieParquetInputFormat {
// add preCombineKey
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();
HoodieTableConfig tableConfig = metaClient.getTableConfig();
- addProjectionToJobConf(realtimeSplit, jobConf,
metaClient.getTableConfig().getPreCombineField());
+ addProjectionToJobConf(realtimeSplit, jobConf, tableConfig);
LOG.info("Creating record reader with readCols :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
@@ -82,7 +85,7 @@ public class HoodieParquetRealtimeInputFormat extends
HoodieParquetInputFormat {
super.getRecordReader(split, jobConf, reporter));
}
- void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf
jobConf, String preCombineKey) {
+ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf
jobConf, HoodieTableConfig tableConfig) {
// Hive on Spark invokes multiple getRecordReaders from different threads
in the same spark task (and hence the
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is
shared across all threads, is at the
// risk of experiencing race conditions. Hence, we synchronize on the
JobConf object here. There is negligible
@@ -101,10 +104,21 @@ public class HoodieParquetRealtimeInputFormat extends
HoodieParquetInputFormat {
// For e:g _hoodie_record_key would be missing and merge step would
throw exceptions.
// TO fix this, hoodie columns are appended late at the time
record-reader gets built instead of construction
// time.
+ List<String> fieldsToAdd = new ArrayList<>();
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
-
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf,
realtimeSplit.getVirtualKeyInfo(),
- StringUtils.isNullOrEmpty(preCombineKey) ? Option.empty() :
Option.of(preCombineKey));
+ HoodieRealtimeInputFormatUtils.addVirtualKeysProjection(jobConf,
realtimeSplit.getVirtualKeyInfo());
+ String preCombineKey = tableConfig.getPreCombineField();
+ if (!StringUtils.isNullOrEmpty(preCombineKey)) {
+ fieldsToAdd.add(preCombineKey);
+ }
}
+
+ Option<String[]> partitions = tableConfig.getPartitionFields();
+ if (partitions.isPresent()) {
+ fieldsToAdd.addAll(Arrays.asList(partitions.get()));
+ }
+ HoodieRealtimeInputFormatUtils.addProjectionField(jobConf,
fieldsToAdd.toArray(new String[0]));
+
jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
setConf(jobConf);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 004ed135fe9..b992d568fea 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -71,7 +71,7 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
readColIdsPrefix = "";
}
- if (!readColNames.contains(fieldName)) {
+ if (!Arrays.asList(readColNames.split(",")).contains(fieldName)) {
// If not already in the list - then add it
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNamesPrefix + fieldName);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIdsPrefix + fieldIndex);
@@ -84,7 +84,19 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
return conf;
}
- public static void addRequiredProjectionFields(Configuration configuration,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo, Option<String>
preCombineKeyOpt) {
+ public static void addProjectionField(Configuration conf, String[]
fieldName) {
+ if (fieldName.length > 0) {
+ List<String> columnNameList =
Arrays.stream(conf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+ Arrays.stream(fieldName).forEach(field -> {
+ int index = columnNameList.indexOf(field);
+ if (index != -1) {
+ addProjectionField(conf, field, index);
+ }
+ });
+ }
+ }
+
+ public static void addVirtualKeysProjection(Configuration configuration,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
// Need this to do merge records in HoodieRealtimeRecordReader
if (!hoodieVirtualKeyInfo.isPresent()) {
addProjectionField(configuration,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
@@ -97,18 +109,6 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
addProjectionField(configuration,
hoodieVirtualKey.getPartitionPathField().get(),
hoodieVirtualKey.getPartitionPathFieldIndex().get());
}
}
-
- if (preCombineKeyOpt.isPresent()) {
- // infer col pos
- String preCombineKey = preCombineKeyOpt.get();
- List<String> columnNameList =
Arrays.stream(configuration.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
- int pos = columnNameList.indexOf(preCombineKey);
- if (pos != -1) {
- addProjectionField(configuration, preCombineKey, pos);
- LOG.info(String.format("add preCombineKey: %s to project columns with
position %s", preCombineKey, pos));
- }
- }
-
}
public static boolean requiredProjectionFieldsExistInConf(Configuration
configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 32365689788..19441e17a59 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -257,10 +257,8 @@ public class HoodieRealtimeRecordReaderUtils {
String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] :
fieldOrderCsv.split(",");
Set<String> fieldOrdersSet = new
LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
- List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() :
Arrays.stream(fieldNameCsv.split(","))
- .filter(fn ->
!partitioningFields.contains(fn)).collect(Collectors.toList());
+ List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() :
Arrays.stream(fieldNameCsv.split(",")).collect(Collectors.toList());
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
- // Hive does not provide ids for partitioning fields, so check for lengths
excluding that.
if (fieldNamesSet.size() != fieldOrders.length) {
throw new HoodieException(String
.format("Error ordering fields for storage read. #fieldNames: %d,
#fieldPositions: %d",
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index a050d7eb88b..562c69b7221 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -192,6 +195,17 @@ public class ITTestHoodieSanity extends ITTestBase {
stdOutErr = executeHiveCommand("select count(1) from " +
roTableName.get());
assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
"Expecting 80 rows to be present in the snapshot table");
+
+ if (partitionType != PartitionType.NON_PARTITIONED) {
+ // Verify queries with partition field predicates, some partitions may
be empty, so we query all the partitions.
+ String[] partitions = getPartitions(roTableName.get());
+ assertTrue(partitions.length > 0);
+ String partitionClause = partitionType ==
PartitionType.SINGLE_KEY_PARTITIONED
+ ?
Arrays.stream(partitions).map(String::trim).collect(Collectors.joining(" or "))
+ : Arrays.stream(partitions).map(par -> String.join(" and ",
par.trim().split("/"))).collect(Collectors.joining(" or "));
+ stdOutErr = executeHiveCommand("select * from " + roTableName.get() +
" where " + partitionClause);
+ assertTrue(stdOutErr.getLeft().split("\n").length > 0, "Expecting at
least one row to be present, but got " + stdOutErr);
+ }
}
// Make the HDFS dataset non-hoodie and run the same query; Checks for
interoperability with non-hoodie tables
@@ -209,6 +223,11 @@ public class ITTestHoodieSanity extends ITTestBase {
"Expecting 280 rows to be present in the new table");
}
+ private String[] getPartitions(String tableName) throws Exception {
+ Pair<String, String> stdOutErr = executeHiveCommand("show partitions " +
tableName);
+ return stdOutErr.getLeft().split("\n");
+ }
+
private static String lastLine(String output) {
String[] lines = output.split("\n");
return lines[lines.length - 1];