xiarixiaoyao commented on a change in pull request #2722:
URL: https://github.com/apache/hudi/pull/2722#discussion_r615510606
##########
File path:
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
##########
@@ -84,6 +86,73 @@ public void setUp() throws IOException, InterruptedException
{
HoodieTestUtils.init(MiniClusterUtil.configuration,
tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
}
+ @Test
+ public void testMutilReaderRealtimeComineHoodieInputFormat() throws
Exception {
+ // test for hudi-1722
+ Configuration conf = new Configuration();
+ // initial commit
+ Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+ HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(),
HoodieTableType.MERGE_ON_READ);
+ String commitTime = "100";
+ final int numRecords = 1000;
+ // Create 3 parquet files with 1000 records each
+ File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir,
schema, 3, numRecords, commitTime);
+ InputFormatTestUtil.commit(tempDir, commitTime);
+
+ String newCommitTime = "101";
+ // to trigger the bug of HUDI-1772, only update fileid2
+ // insert 1000 update records to log file 2
+ // now fileid0, fileid1 has no log files, fileid2 has log file
+ HoodieLogFormat.Writer writer =
+ InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, "fileid2", commitTime, newCommitTime,
+ numRecords, numRecords, 0);
+ writer.close();
+
+ TableDesc tblDesc = Utilities.defaultTd;
+ // Set the input format
+ tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
+ PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+ LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+ LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
+ ArrayList<String> alias = new ArrayList<>();
+ alias.add(tempDir.toAbsolutePath().toString());
+ tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
+ pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
+
+ MapredWork mrwork = new MapredWork();
+ mrwork.getMapWork().setPathToPartitionInfo(pt);
+ mrwork.getMapWork().setPathToAliases(tableAlias);
+ Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+ Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+ jobConf = new JobConf(conf);
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+ jobConf.set(HAS_MAP_WORK, "true");
+ // The following config tells Hive to choose ExecMapper to read the
MAP_WORK
+ jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+ // set SPLIT_MAXSIZE larger to create one split for 3 files groups
+
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE,
"128000000");
+
+ HoodieCombineHiveInputFormat combineHiveInputFormat = new
HoodieCombineHiveInputFormat();
+ String tripsHiveColumnTypes =
"double,string,string,string,double,double,double,double,double";
+ InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema,
tripsHiveColumnTypes);
+ InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+ // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3
file groups
+ assertEquals(1, splits.length);
+ RecordReader<NullWritable, ArrayWritable> recordReader =
Review comment:
yes, we only create one combine recorder, but this recorder hold three
RealtimeCompactedRecordReaders。
the creating order of those RealtimeCompactedRecordReaders lead this npe
problem.
for test example:
combine recorder holds three RealtimeCompactedRecordReaders, we call them
creader1, creader2, creader3
creader1: only has base file
creader2: only has base file
creader3: has base file and log file.
if creader3 is create firstly, hoodie additional projection columns will be
added to jobConf and in this case the query will be ok
however if creader1 or creader2 is create firstly, no hoodie additional
projection columns will be added to jobConf, the query will failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]