bvaradar commented on a change in pull request #727: Ensure log files are
consistently ordered when scanning
URL: https://github.com/apache/incubator-hudi/pull/727#discussion_r292727010
##########
File path:
hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
##########
@@ -122,58 +152,89 @@ public void testReader(boolean partitioned) throws
Exception {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.initTableType(hadoopConf,
basePath.getRoot().getAbsolutePath(),
HoodieTableType.MERGE_ON_READ);
- String commitTime = "100";
+ String baseInstant = "100";
File partitionDir =
- partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath,
schema, 1, 100, commitTime)
- : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath,
schema, 1, 100, commitTime);
- InputFormatTestUtil.commit(basePath, commitTime);
+ partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath,
schema, 1, 100, baseInstant)
+ : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath,
schema, 1, 100, baseInstant);
+ InputFormatTestUtil.commit(basePath, baseInstant);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
- // update files or generate new log file
- String newCommitTime = "101";
- HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema,
"fileid0", commitTime,
- newCommitTime, 100);
- long size = writer.getCurrentSize();
- writer.close();
- assertTrue("block - size should be > 0", size > 0);
-
- //create a split with baseFile (parquet file written earlier) and new log
file(s)
- String logFilePath = writer.getLogFile().getPath().toString();
- HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
- new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime +
".parquet"), 0, 1,
- jobConf), basePath.getRoot().getPath(),
Arrays.asList(logFilePath), newCommitTime);
-
- //create a RecordReader to be used by HoodieRealtimeRecordReader
- RecordReader<Void, ArrayWritable> reader =
- new MapredParquetInputFormat().getRecordReader(
- new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()),
(String[]) null),
- jobConf, null);
- JobConf jobConf = new JobConf();
- List<Schema.Field> fields = schema.getFields();
- String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
- String postions = fields.stream().map(f -> String.valueOf(f.pos()))
- .collect(Collectors.joining(","));
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
- if (partitioned) {
- jobConf.set("partition_columns", "datestr");
- }
+ List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
+ // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
+ // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
Review comment:
I tested this code by rebasing against nishith's PR. We can enable this once
both PRs are merged.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services