This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 15cd0522d71 [HUDI-5308] Hive3 query returns null when the where clause 
has a partition field (#7355)
15cd0522d71 is described below

commit 15cd0522d7139b8dfa84c9f45040287ce409476a
Author: Manu <[email protected]>
AuthorDate: Wed May 10 16:43:35 2023 +0800

    [HUDI-5308] Hive3 query returns null when the where clause has a partition 
field (#7355)
    
    * Partition query in hive3 returns null for Hive 3.x.
---
 .../hudi/hadoop/HoodieParquetInputFormat.java      |  5 +++-
 .../realtime/HoodieHFileRealtimeInputFormat.java   |  2 +-
 .../realtime/HoodieParquetRealtimeInputFormat.java | 22 +++++++++++++----
 .../utils/HoodieRealtimeInputFormatUtils.java      | 28 +++++++++++-----------
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  4 +---
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  | 19 +++++++++++++++
 6 files changed, 57 insertions(+), 23 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index dcb4b513fe5..c3588ef4e30 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -21,7 +21,7 @@ package org.apache.hudi.hadoop;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
-
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.io.ArrayWritable;
@@ -31,6 +31,8 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,6 +86,7 @@ public class HoodieParquetInputFormat extends 
HoodieParquetInputFormatBase {
       LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
     }
 
+    HoodieRealtimeInputFormatUtils.addProjectionField(job, 
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
     return getRecordReaderInternal(split, job, reporter);
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
index 10a7864dfbd..c7655abbbf3 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
@@ -73,7 +73,7 @@ public class HoodieHFileRealtimeInputFormat extends 
HoodieMergeOnReadTableInputF
           // For e:g _hoodie_record_key would be missing and merge step would 
throw exceptions.
           // TO fix this, hoodie columns are appended late at the time 
record-reader gets built instead of construction
           // time.
-          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
Option.empty(), Option.empty());
+          HoodieRealtimeInputFormatUtils.addVirtualKeysProjection(jobConf, 
Option.empty());
 
           this.conf = jobConf;
           this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, 
"true");
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index d474ee3cef6..c3d2c0d63b5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -41,6 +41,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 /**
  * Input Format, that provides a real-time view of data in a Hoodie table.
@@ -69,7 +72,7 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
     // add preCombineKey
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
-    addProjectionToJobConf(realtimeSplit, jobConf, 
metaClient.getTableConfig().getPreCombineField());
+    addProjectionToJobConf(realtimeSplit, jobConf, tableConfig);
     LOG.info("Creating record reader with readCols :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
 
@@ -82,7 +85,7 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
         super.getRecordReader(split, jobConf, reporter));
   }
 
-  void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf, String preCombineKey) {
+  void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf, HoodieTableConfig tableConfig) {
     // Hive on Spark invokes multiple getRecordReaders from different threads 
in the same spark task (and hence the
     // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is 
shared across all threads, is at the
     // risk of experiencing race conditions. Hence, we synchronize on the 
JobConf object here. There is negligible
@@ -101,10 +104,21 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
           // For e:g _hoodie_record_key would be missing and merge step would 
throw exceptions.
           // TO fix this, hoodie columns are appended late at the time 
record-reader gets built instead of construction
           // time.
+          List<String> fieldsToAdd = new ArrayList<>();
           if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
-            
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
realtimeSplit.getVirtualKeyInfo(),
-                StringUtils.isNullOrEmpty(preCombineKey) ? Option.empty() : 
Option.of(preCombineKey));
+            HoodieRealtimeInputFormatUtils.addVirtualKeysProjection(jobConf, 
realtimeSplit.getVirtualKeyInfo());
+            String preCombineKey = tableConfig.getPreCombineField();
+            if (!StringUtils.isNullOrEmpty(preCombineKey)) {
+              fieldsToAdd.add(preCombineKey);
+            }
           }
+
+          Option<String[]> partitions = tableConfig.getPartitionFields();
+          if (partitions.isPresent()) {
+            fieldsToAdd.addAll(Arrays.asList(partitions.get()));
+          }
+          HoodieRealtimeInputFormatUtils.addProjectionField(jobConf, 
fieldsToAdd.toArray(new String[0]));
+
           jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
           setConf(jobConf);
         }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 004ed135fe9..b992d568fea 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -71,7 +71,7 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
       readColIdsPrefix = "";
     }
 
-    if (!readColNames.contains(fieldName)) {
+    if (!Arrays.asList(readColNames.split(",")).contains(fieldName)) {
       // If not already in the list - then add it
       conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
readColNamesPrefix + fieldName);
       conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
readColIdsPrefix + fieldIndex);
@@ -84,7 +84,19 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
     return conf;
   }
 
-  public static void addRequiredProjectionFields(Configuration configuration, 
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo, Option<String> 
preCombineKeyOpt) {
+  public static void addProjectionField(Configuration conf, String[] 
fieldName) {
+    if (fieldName.length > 0) {
+      List<String> columnNameList = 
Arrays.stream(conf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+      Arrays.stream(fieldName).forEach(field -> {
+        int index = columnNameList.indexOf(field);
+        if (index != -1) {
+          addProjectionField(conf, field, index);
+        }
+      });
+    }
+  }
+
+  public static void addVirtualKeysProjection(Configuration configuration, 
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
     // Need this to do merge records in HoodieRealtimeRecordReader
     if (!hoodieVirtualKeyInfo.isPresent()) {
       addProjectionField(configuration, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
@@ -97,18 +109,6 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
         addProjectionField(configuration, 
hoodieVirtualKey.getPartitionPathField().get(), 
hoodieVirtualKey.getPartitionPathFieldIndex().get());
       }
     }
-
-    if (preCombineKeyOpt.isPresent()) {
-      // infer col pos
-      String preCombineKey = preCombineKeyOpt.get();
-      List<String> columnNameList = 
Arrays.stream(configuration.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
-      int pos = columnNameList.indexOf(preCombineKey);
-      if (pos != -1) {
-        addProjectionField(configuration, preCombineKey, pos);
-        LOG.info(String.format("add preCombineKey: %s to project columns with 
position %s", preCombineKey, pos));
-      }
-    }
-
   }
 
   public static boolean requiredProjectionFieldsExistInConf(Configuration 
configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 32365689788..19441e17a59 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -257,10 +257,8 @@ public class HoodieRealtimeRecordReaderUtils {
     String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : 
fieldOrderCsv.split(",");
     Set<String> fieldOrdersSet = new 
LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
     String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
-    List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : 
Arrays.stream(fieldNameCsv.split(","))
-        .filter(fn -> 
!partitioningFields.contains(fn)).collect(Collectors.toList());
+    List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : 
Arrays.stream(fieldNameCsv.split(",")).collect(Collectors.toList());
     Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
-    // Hive does not provide ids for partitioning fields, so check for lengths 
excluding that.
     if (fieldNamesSet.size() != fieldOrders.length) {
       throw new HoodieException(String
           .format("Error ordering fields for storage read. #fieldNames: %d, 
#fieldPositions: %d",
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index a050d7eb88b..562c69b7221 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -192,6 +195,17 @@ public class ITTestHoodieSanity extends ITTestBase {
       stdOutErr = executeHiveCommand("select count(1) from " + 
roTableName.get());
       assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
           "Expecting 80 rows to be present in the snapshot table");
+
+      if (partitionType != PartitionType.NON_PARTITIONED) {
+        // Verify queries with partition field predicates, some partitions may 
be empty, so we query all the partitions.
+        String[] partitions = getPartitions(roTableName.get());
+        assertTrue(partitions.length > 0);
+        String partitionClause = partitionType == 
PartitionType.SINGLE_KEY_PARTITIONED
+            ? 
Arrays.stream(partitions).map(String::trim).collect(Collectors.joining(" or "))
+            : Arrays.stream(partitions).map(par -> String.join(" and ", 
par.trim().split("/"))).collect(Collectors.joining(" or "));
+        stdOutErr = executeHiveCommand("select * from " + roTableName.get() + 
" where " + partitionClause);
+        assertTrue(stdOutErr.getLeft().split("\n").length > 0, "Expecting at 
least one row to be present, but got " + stdOutErr);
+      }
     }
 
     // Make the HDFS dataset non-hoodie and run the same query; Checks for 
interoperability with non-hoodie tables
@@ -209,6 +223,11 @@ public class ITTestHoodieSanity extends ITTestBase {
         "Expecting 280 rows to be present in the new table");
   }
 
+  private String[] getPartitions(String tableName) throws Exception {
+    Pair<String, String> stdOutErr = executeHiveCommand("show partitions " + 
tableName);
+    return stdOutErr.getLeft().split("\n");
+  }
+
   private static String lastLine(String output) {
     String[] lines = output.split("\n");
     return lines[lines.length - 1];

Reply via email to