This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f7ff7e  [HUDI-1722]Fix hive beeline/spark-sql query specified field 
on mor table occur NPE (#2722)
6f7ff7e is described below

commit 6f7ff7e8ca3a83d858561dd08f8f093787caa9b2
Author: xiarixiaoyao <[email protected]>
AuthorDate: Wed May 12 20:52:37 2021 +0800

    [HUDI-1722]Fix hive beeline/spark-sql query specified field on mor table 
occur NPE (#2722)
---
 .../realtime/HoodieParquetRealtimeInputFormat.java |  4 +-
 .../utils/HoodieRealtimeInputFormatUtils.java      | 14 +++++
 .../TestHoodieCombineHiveInputFormat.java          | 67 ++++++++++++++++++++++
 .../hudi/hadoop/testutils/InputFormatTestUtil.java | 27 +++++++++
 4 files changed, 110 insertions(+), 2 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index d8f0a01..6b827d3 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -85,12 +85,12 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat i
     // risk of experiencing race conditions. Hence, we synchronize on the 
JobConf object here. There is negligible
     // latency incurred here due to the synchronization since get record 
reader is called once per spilt before the
     // actual heavy lifting of reading the parquet files happen.
-    if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
+    if 
(HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, 
jobConf)) {
       synchronized (jobConf) {
         LOG.info(
             "Before adding Hoodie columns, Projections :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
                 + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-        if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == 
null) {
+        if 
(HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, 
jobConf)) {
           // Hive (across all versions) fails for queries like select 
count(`_hoodie_commit_time`) from table;
           // In this case, the projection fields gets removed. Looking at 
HiveInputFormat implementation, in some cases
           // hoodie additional projection columns are reset after calling 
setConf and only natural projections
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index ce770ba..db8de64 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -218,6 +220,18 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
     addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
   }
 
+  public static boolean requiredProjectionFieldsExistInConf(Configuration 
configuration) {
+    String readColNames = 
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
+    return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+        && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+        && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+  }
+
+  public static boolean canAddProjectionToJobConf(final RealtimeSplit 
realtimeSplit, final JobConf jobConf) {
+    return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
+            || (!realtimeSplit.getDeltaLogPaths().isEmpty() && 
!HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf));
+  }
+
   /**
    * Hive will append read columns' ids to old columns' ids during 
getRecordReader. In some cases, e.g. SELECT COUNT(*),
    * the read columns' id is an empty string and Hive will combine it with 
Hoodie required projection ids and becomes
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
index a5b1931..5ec32d7 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
@@ -157,6 +157,73 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
   }
 
   @Test
+  public void testMutilReaderRealtimeComineHoodieInputFormat() throws 
Exception {
+    // test for hudi-1722
+    Configuration conf = new Configuration();
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    String commitTime = "100";
+    final int numRecords = 1000;
+    // Create 3 parquet files with 1000 records each
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, 
schema, 3, numRecords, commitTime);
+    InputFormatTestUtil.commit(tempDir, commitTime);
+
+    String newCommitTime = "101";
+    // to trigger the bug of HUDI-1772, only update fileid2
+    // insert 1000 update records to log file 2
+    // now fileid0, fileid1 has no log files, fileid2 has log file
+    HoodieLogFormat.Writer writer =
+            InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid2", commitTime, newCommitTime,
+                    numRecords, numRecords, 0);
+    writer.close();
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    // Set the input format
+    tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+    LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
+    ArrayList<String> alias = new ArrayList<>();
+    alias.add(tempDir.toAbsolutePath().toString());
+    tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
+    pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
+
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    mrwork.getMapWork().setPathToAliases(tableAlias);
+    Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+    Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+    jobConf = new JobConf(conf);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    jobConf.set(HAS_MAP_WORK, "true");
+    // The following config tells Hive to choose ExecMapper to read the 
MAP_WORK
+    jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+    // set SPLIT_MAXSIZE larger  to create one split for 3 files groups
+    
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE,
 "128000000");
+
+    HoodieCombineHiveInputFormat combineHiveInputFormat = new 
HoodieCombineHiveInputFormat();
+    String tripsHiveColumnTypes = 
"double,string,string,string,double,double,double,double,double";
+    InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, 
tripsHiveColumnTypes);
+    InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+    // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 
file groups
+    assertEquals(1, splits.length);
+    RecordReader<NullWritable, ArrayWritable> recordReader =
+            combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
+    NullWritable nullWritable = recordReader.createKey();
+    ArrayWritable arrayWritable = recordReader.createValue();
+    int counter = 0;
+    while (recordReader.next(nullWritable, arrayWritable)) {
+      // read over all the splits
+      counter++;
+    }
+    // should read out 3 splits, each for file0, file1, file2 containing 1000 
records each
+    assertEquals(3000, counter);
+    recordReader.close();
+  }
+
+  @Test
   @Disabled
   public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 60092f2..f49ce06 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -326,6 +326,33 @@ public class InputFormatTestUtil {
     return writer;
   }
 
+  public static void setProjectFieldsForInputFormat(JobConf jobConf,
+      Schema schema, String hiveColumnTypes) {
+    List<Schema.Field> fields = schema.getFields();
+    String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
+    String postions = fields.stream().map(f -> 
String.valueOf(f.pos())).collect(Collectors.joining(","));
+    Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
+
+    String hiveColumnNames = fields.stream().filter(field -> 
!field.name().equalsIgnoreCase("datestr"))
+        .map(Schema.Field::name).collect(Collectors.joining(","));
+    hiveColumnNames = hiveColumnNames + ",datestr";
+    String modifiedHiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
+    modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
+    // skip choose hoodie meta_columns, only choose one origin column to 
trigger HUID-1722
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
names.split(",")[5]);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
postions.split(",")[5]);
+    jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, 
"datestr");
+    conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+    // skip choose hoodie meta_columns, only choose one origin column to 
trigger HUID-1722
+    conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
names.split(",")[5]);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
postions.split(",")[5]);
+    conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
+    conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
+    jobConf.addResource(conf);
+  }
+
   public static void setPropsForInputFormat(JobConf jobConf,
       Schema schema, String hiveColumnTypes) {
     List<Schema.Field> fields = schema.getFields();

Reply via email to