This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f7ff7e [HUDI-1722]Fix hive beeline/spark-sql query specified field
on mor table occur NPE (#2722)
6f7ff7e is described below
commit 6f7ff7e8ca3a83d858561dd08f8f093787caa9b2
Author: xiarixiaoyao <[email protected]>
AuthorDate: Wed May 12 20:52:37 2021 +0800
[HUDI-1722]Fix hive beeline/spark-sql query specified field on mor table
occur NPE (#2722)
---
.../realtime/HoodieParquetRealtimeInputFormat.java | 4 +-
.../utils/HoodieRealtimeInputFormatUtils.java | 14 +++++
.../TestHoodieCombineHiveInputFormat.java | 67 ++++++++++++++++++++++
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 27 +++++++++
4 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index d8f0a01..6b827d3 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -85,12 +85,12 @@ public class HoodieParquetRealtimeInputFormat extends
HoodieParquetInputFormat i
// risk of experiencing race conditions. Hence, we synchronize on the
JobConf object here. There is negligible
// latency incurred here due to the synchronization since get record
reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
- if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
+ if
(HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit,
jobConf)) {
synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" +
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) ==
null) {
+ if
(HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit,
jobConf)) {
// Hive (across all versions) fails for queries like select
count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at
HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling
setConf and only natural projections
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index ce770ba..db8de64 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -218,6 +220,18 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
addProjectionField(configuration,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
}
+ public static boolean requiredProjectionFieldsExistInConf(Configuration
configuration) {
+ String readColNames =
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
+ return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+ && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ }
+
+ public static boolean canAddProjectionToJobConf(final RealtimeSplit
realtimeSplit, final JobConf jobConf) {
+ return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
+ || (!realtimeSplit.getDeltaLogPaths().isEmpty() &&
!HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf));
+ }
+
/**
* Hive will append read columns' ids to old columns' ids during
getRecordReader. In some cases, e.g. SELECT COUNT(*),
* the read columns' id is an empty string and Hive will combine it with
Hoodie required projection ids and becomes
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
index a5b1931..5ec32d7 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
@@ -157,6 +157,73 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
}
@Test
+ public void testMutilReaderRealtimeComineHoodieInputFormat() throws
Exception {
+ // test for hudi-1722
+ Configuration conf = new Configuration();
+ // initial commit
+ Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+ HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(),
HoodieTableType.MERGE_ON_READ);
+ String commitTime = "100";
+ final int numRecords = 1000;
+ // Create 3 parquet files with 1000 records each
+ File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir,
schema, 3, numRecords, commitTime);
+ InputFormatTestUtil.commit(tempDir, commitTime);
+
+ String newCommitTime = "101";
+ // to trigger the bug of HUDI-1772, only update fileid2
+ // insert 1000 update records to log file 2
+ // now fileid0, fileid1 has no log files, fileid2 has log file
+ HoodieLogFormat.Writer writer =
+ InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, "fileid2", commitTime, newCommitTime,
+ numRecords, numRecords, 0);
+ writer.close();
+
+ TableDesc tblDesc = Utilities.defaultTd;
+ // Set the input format
+ tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
+ PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+ LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+ LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
+ ArrayList<String> alias = new ArrayList<>();
+ alias.add(tempDir.toAbsolutePath().toString());
+ tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
+ pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
+
+ MapredWork mrwork = new MapredWork();
+ mrwork.getMapWork().setPathToPartitionInfo(pt);
+ mrwork.getMapWork().setPathToAliases(tableAlias);
+ Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+ Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+ jobConf = new JobConf(conf);
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+ jobConf.set(HAS_MAP_WORK, "true");
+ // The following config tells Hive to choose ExecMapper to read the
MAP_WORK
+ jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+ // set SPLIT_MAXSIZE larger to create one split for 3 files groups
+
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE,
"128000000");
+
+ HoodieCombineHiveInputFormat combineHiveInputFormat = new
HoodieCombineHiveInputFormat();
+ String tripsHiveColumnTypes =
"double,string,string,string,double,double,double,double,double";
+ InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema,
tripsHiveColumnTypes);
+ InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+ // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3
file groups
+ assertEquals(1, splits.length);
+ RecordReader<NullWritable, ArrayWritable> recordReader =
+ combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
+ NullWritable nullWritable = recordReader.createKey();
+ ArrayWritable arrayWritable = recordReader.createValue();
+ int counter = 0;
+ while (recordReader.next(nullWritable, arrayWritable)) {
+ // read over all the splits
+ counter++;
+ }
+ // should read out 3 splits, each for file0, file1, file2 containing 1000
records each
+ assertEquals(3000, counter);
+ recordReader.close();
+ }
+
+ @Test
@Disabled
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 60092f2..f49ce06 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -326,6 +326,33 @@ public class InputFormatTestUtil {
return writer;
}
+ public static void setProjectFieldsForInputFormat(JobConf jobConf,
+ Schema schema, String hiveColumnTypes) {
+ List<Schema.Field> fields = schema.getFields();
+ String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
+ String postions = fields.stream().map(f ->
String.valueOf(f.pos())).collect(Collectors.joining(","));
+ Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
+
+ String hiveColumnNames = fields.stream().filter(field ->
!field.name().equalsIgnoreCase("datestr"))
+ .map(Schema.Field::name).collect(Collectors.joining(","));
+ hiveColumnNames = hiveColumnNames + ",datestr";
+ String modifiedHiveColumnTypes =
HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
+ modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
+ jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+ jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES,
modifiedHiveColumnTypes);
+ // skip choose hoodie meta_columns, only choose one origin column to
trigger HUID-1722
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
names.split(",")[5]);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
postions.split(",")[5]);
+ jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
"datestr");
+ conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+ // skip choose hoodie meta_columns, only choose one origin column to
trigger HUID-1722
+ conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
names.split(",")[5]);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
postions.split(",")[5]);
+ conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
+ conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES,
modifiedHiveColumnTypes);
+ jobConf.addResource(conf);
+ }
+
public static void setPropsForInputFormat(JobConf jobConf,
Schema schema, String hiveColumnTypes) {
List<Schema.Field> fields = schema.getFields();