This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1c943ab Ensure log files are consistently ordered when scanning
1c943ab is described below
commit 1c943ab2305a06467cb039b534ee0915e4ef7453
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Jun 11 19:06:06 2019 -0700
Ensure log files are consistently ordered when scanning
---
.../table/log/HoodieMergedLogRecordScanner.java | 2 +-
.../hadoop/realtime/HoodieRealtimeInputFormat.java | 3 +-
.../realtime/HoodieRealtimeRecordReaderTest.java | 159 ++++++++++++++-------
3 files changed, 113 insertions(+), 51 deletions(-)
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
index f23c0d8..d99babe 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
@@ -110,7 +110,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
if (records.containsKey(key)) {
// Merge and store the merged record. The HoodieRecordPayload
implementation is free to decide what should be
// done when a delete (empty payload) is encountered before or after an
insert/update.
- HoodieRecordPayload combinedValue =
records.get(key).getData().preCombine(hoodieRecord.getData());
+ HoodieRecordPayload combinedValue =
hoodieRecord.getData().preCombine(records.get(key).getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key,
hoodieRecord.getPartitionPath()), combinedValue));
} else {
// Put the record as is
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
index 748c3ff..6e323ec 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.hadoop.realtime;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.FileSlice;
+import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -125,7 +126,7 @@ public class HoodieRealtimeInputFormat extends
HoodieInputFormat implements Conf
List<FileSplit> dataFileSplits =
groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
- List<String> logFilePaths = fileSlice.getLogFiles()
+ List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList());
// Get the maxCommit from the last delta or compaction or commit
- when
// bootstrapped from COW table
diff --git
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index 6b64c4e..c9ffa80 100644
---
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -21,15 +21,22 @@ package com.uber.hoodie.hadoop.realtime;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Maps;
+import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
+import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
+import
com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.hadoop.InputFormatTestUtil;
import java.io.File;
import java.io.IOException;
@@ -84,15 +91,38 @@ public class HoodieRealtimeRecordReaderTest {
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema
schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords)
throws InterruptedException, IOException {
- return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit,
numberOfRecords, 0);
+ return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit,
numberOfRecords, 0, 0);
+ }
+
+ private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema
schema, String fileId,
+ String baseCommit, String newCommit, String rolledBackInstant, int
logVersion)
+ throws InterruptedException, IOException {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+ .onParentPath(new Path(partitionDir.getPath()))
+ .withFileId(fileId).overBaseCommit(baseCommit)
+ .withFs(fs)
+ .withLogVersion(logVersion)
+ .withLogWriteToken("1-0-1")
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ // generate metadata
+ Map<HeaderMetadataType, String> header = Maps.newHashMap();
+ header.put(HeaderMetadataType.INSTANT_TIME, newCommit);
+ header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
+ header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
+ .ordinal()));
+ // if update belongs to an existing log file
+ writer = writer.appendBlock(new HoodieCommandBlock(header));
+ return writer;
}
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema
schema, String fileId,
- String baseCommit, String newCommit, int numberOfRecords, int offset)
+ String baseCommit, String newCommit, int numberOfRecords, int offset,
int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+ .withLogVersion(logVersion)
+ .withLogWriteToken("1-0-1")
.overBaseCommit(baseCommit).withFs(fs).build();
List<IndexedRecord> records = new ArrayList<>();
for (int i = offset; i < offset + numberOfRecords; i++) {
@@ -122,58 +152,89 @@ public class HoodieRealtimeRecordReaderTest {
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.initTableType(hadoopConf,
basePath.getRoot().getAbsolutePath(),
HoodieTableType.MERGE_ON_READ);
- String commitTime = "100";
+ String baseInstant = "100";
File partitionDir =
- partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath,
schema, 1, 100, commitTime)
- : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath,
schema, 1, 100, commitTime);
- InputFormatTestUtil.commit(basePath, commitTime);
+ partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath,
schema, 1, 100, baseInstant)
+ : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath,
schema, 1, 100, baseInstant);
+ InputFormatTestUtil.commit(basePath, baseInstant);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
- // update files or generate new log file
- String newCommitTime = "101";
- HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema,
"fileid0", commitTime,
- newCommitTime, 100);
- long size = writer.getCurrentSize();
- writer.close();
- assertTrue("block - size should be > 0", size > 0);
-
- //create a split with baseFile (parquet file written earlier) and new log
file(s)
- String logFilePath = writer.getLogFile().getPath().toString();
- HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
- new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime +
".parquet"), 0, 1,
- jobConf), basePath.getRoot().getPath(),
Arrays.asList(logFilePath), newCommitTime);
-
- //create a RecordReader to be used by HoodieRealtimeRecordReader
- RecordReader<Void, ArrayWritable> reader =
- new MapredParquetInputFormat().getRecordReader(
- new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()),
(String[]) null),
- jobConf, null);
- JobConf jobConf = new JobConf();
- List<Schema.Field> fields = schema.getFields();
- String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
- String postions = fields.stream().map(f -> String.valueOf(f.pos()))
- .collect(Collectors.joining(","));
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
- if (partitioned) {
- jobConf.set("partition_columns", "datestr");
- }
+ List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
+ logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
+ // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
+ // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
+ FileSlice fileSlice = new FileSlice(partitioned ?
FSUtils.getRelativePartitionPath(new Path(
+ basePath.getRoot().getAbsolutePath()), new
Path(partitionDir.getAbsolutePath())) : "default",
+ baseInstant, "fileid0");
+ logVersionsWithAction.stream().forEach(logVersionWithAction -> {
+ try {
+ // update files or generate new log file
+ int logVersion = logVersionWithAction.getRight();
+ String action = logVersionWithAction.getKey();
+ int baseInstantTs = Integer.parseInt(baseInstant);
+ String instantTime = String.valueOf(baseInstantTs + logVersion);
+ String latestInstant = action.equals(HoodieTimeline.ROLLBACK_ACTION)
+ ? String.valueOf(baseInstantTs + logVersion - 2) : instantTime;
+
+ HoodieLogFormat.Writer writer = null;
+ if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
+ writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
+ instantTime, String.valueOf(baseInstantTs + logVersion - 1),
logVersion);
+ } else {
+ writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
+ instantTime, 100, 0, logVersion);
+ }
+ long size = writer.getCurrentSize();
+ writer.close();
+ assertTrue("block - size should be > 0", size > 0);
+
+ //create a split with baseFile (parquet file written earlier) and new
log file(s)
+ fileSlice.addLogFile(writer.getLogFile());
+ HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
+ new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" +
baseInstant + ".parquet"), 0, 1,
+ jobConf), basePath.getRoot().getPath(),
+
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(h ->
h.getPath().toString())
+ .collect(Collectors.toList()), instantTime);
+
+ //create a RecordReader to be used by HoodieRealtimeRecordReader
+ RecordReader<Void, ArrayWritable> reader =
+ new MapredParquetInputFormat().getRecordReader(
+ new FileSplit(split.getPath(), 0,
fs.getLength(split.getPath()), (String[]) null),
+ jobConf, null);
+ JobConf jobConf = new JobConf();
+ List<Schema.Field> fields = schema.getFields();
+ String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
+ String postions = fields.stream().map(f -> String.valueOf(f.pos()))
+ .collect(Collectors.joining(","));
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+ if (partitioned) {
+ jobConf.set("partition_columns", "datestr");
+ }
+
+ //validate record reader compaction
+ HoodieRealtimeRecordReader recordReader = new
HoodieRealtimeRecordReader(split, jobConf, reader);
+
+ //use reader to read base Parquet File and log file, merge in flight
and return latest commit
+ //here all 100 records should be updated, see above
+ Void key = recordReader.createKey();
+ ArrayWritable value = recordReader.createValue();
+ while (recordReader.next(key, value)) {
+ Writable[] values = value.get();
+ //check if the record written is with latest commit, here "101"
+ Assert.assertEquals(latestInstant, values[0].toString());
+ key = recordReader.createKey();
+ value = recordReader.createValue();
+ }
+ } catch (Exception ioe) {
+ throw new HoodieException(ioe.getMessage(), ioe);
+ }
+ });
- //validate record reader compaction
- HoodieRealtimeRecordReader recordReader = new
HoodieRealtimeRecordReader(split, jobConf, reader);
+ // Add Rollback last version to next log-file
- //use reader to read base Parquet File and log file, merge in flight and
return latest commit
- //here all 100 records should be updated, see above
- Void key = recordReader.createKey();
- ArrayWritable value = recordReader.createValue();
- while (recordReader.next(key, value)) {
- Writable[] values = value.get();
- //check if the record written is with latest commit, here "101"
- Assert.assertEquals(values[0].toString(), newCommitTime);
- key = recordReader.createKey();
- value = recordReader.createValue();
- }
}
@Test
@@ -195,7 +256,7 @@ public class HoodieRealtimeRecordReaderTest {
// insert new records to log file
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema,
"fileid0", commitTime,
- newCommitTime, numRecords, numRecords);
+ newCommitTime, numRecords, numRecords, 0);
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);