This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c943ab  Ensure log files are consistently ordered when scanning
1c943ab is described below

commit 1c943ab2305a06467cb039b534ee0915e4ef7453
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Jun 11 19:06:06 2019 -0700

    Ensure log files are consistently ordered when scanning
---
 .../table/log/HoodieMergedLogRecordScanner.java    |   2 +-
 .../hadoop/realtime/HoodieRealtimeInputFormat.java |   3 +-
 .../realtime/HoodieRealtimeRecordReaderTest.java   | 159 ++++++++++++++-------
 3 files changed, 113 insertions(+), 51 deletions(-)

diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
index f23c0d8..d99babe 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java
@@ -110,7 +110,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     if (records.containsKey(key)) {
       // Merge and store the merged record. The HoodieRecordPayload 
implementation is free to decide what should be
       // done when a delete (empty payload) is encountered before or after an 
insert/update.
-      HoodieRecordPayload combinedValue = 
records.get(key).getData().preCombine(hoodieRecord.getData());
+      HoodieRecordPayload combinedValue = 
hoodieRecord.getData().preCombine(records.get(key).getData());
       records.put(key, new HoodieRecord<>(new HoodieKey(key, 
hoodieRecord.getPartitionPath()), combinedValue));
     } else {
       // Put the record as is
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
index 748c3ff..6e323ec 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.hadoop.realtime;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.uber.hoodie.common.model.FileSlice;
+import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
@@ -125,7 +126,7 @@ public class HoodieRealtimeInputFormat extends 
HoodieInputFormat implements Conf
           List<FileSplit> dataFileSplits = 
groupedInputSplits.get(fileSlice.getFileId());
           dataFileSplits.forEach(split -> {
             try {
-              List<String> logFilePaths = fileSlice.getLogFiles()
+              List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
                   .map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList());
               // Get the maxCommit from the last delta or compaction or commit 
- when
               // bootstrapped from COW table
diff --git 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index 6b64c4e..c9ffa80 100644
--- 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++ 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -21,15 +21,22 @@ package com.uber.hoodie.hadoop.realtime;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Maps;
+import com.uber.hoodie.common.model.FileSlice;
 import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.log.HoodieLogFormat;
 import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
+import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
+import 
com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
 import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import 
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.SchemaTestUtil;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.hadoop.InputFormatTestUtil;
 import java.io.File;
 import java.io.IOException;
@@ -84,15 +91,38 @@ public class HoodieRealtimeRecordReaderTest {
   private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema 
schema, String fileId,
       String baseCommit, String newCommit, int numberOfRecords)
       throws InterruptedException, IOException {
-    return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, 
numberOfRecords, 0);
+    return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, 
numberOfRecords, 0, 0);
+  }
+
+  private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema 
schema, String fileId,
+      String baseCommit, String newCommit, String rolledBackInstant, int 
logVersion)
+      throws InterruptedException, IOException {
+    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+        .onParentPath(new Path(partitionDir.getPath()))
+        .withFileId(fileId).overBaseCommit(baseCommit)
+        .withFs(fs)
+        .withLogVersion(logVersion)
+        .withLogWriteToken("1-0-1")
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+    // generate metadata
+    Map<HeaderMetadataType, String> header = Maps.newHashMap();
+    header.put(HeaderMetadataType.INSTANT_TIME, newCommit);
+    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
+    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, 
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
+        .ordinal()));
+    // if update belongs to an existing log file
+    writer = writer.appendBlock(new HoodieCommandBlock(header));
+    return writer;
   }
 
   private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema 
schema, String fileId,
-      String baseCommit, String newCommit, int numberOfRecords, int offset)
+      String baseCommit, String newCommit, int numberOfRecords, int offset, 
int logVersion)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
         .onParentPath(new Path(partitionDir.getPath()))
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+        .withLogVersion(logVersion)
+        .withLogWriteToken("1-0-1")
         .overBaseCommit(baseCommit).withFs(fs).build();
     List<IndexedRecord> records = new ArrayList<>();
     for (int i = offset; i < offset + numberOfRecords; i++) {
@@ -122,58 +152,89 @@ public class HoodieRealtimeRecordReaderTest {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
     HoodieTestUtils.initTableType(hadoopConf, 
basePath.getRoot().getAbsolutePath(),
         HoodieTableType.MERGE_ON_READ);
-    String commitTime = "100";
+    String baseInstant = "100";
     File partitionDir =
-        partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, 
schema, 1, 100, commitTime)
-        : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, 
schema, 1, 100, commitTime);
-    InputFormatTestUtil.commit(basePath, commitTime);
+        partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, 
schema, 1, 100, baseInstant)
+        : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, 
schema, 1, 100, baseInstant);
+    InputFormatTestUtil.commit(basePath, baseInstant);
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
 
-    // update files or generate new log file
-    String newCommitTime = "101";
-    HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, 
"fileid0", commitTime,
-        newCommitTime, 100);
-    long size = writer.getCurrentSize();
-    writer.close();
-    assertTrue("block - size should be > 0", size > 0);
-
-    //create a split with baseFile (parquet file written earlier) and new log 
file(s)
-    String logFilePath = writer.getLogFile().getPath().toString();
-    HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
-        new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + 
".parquet"), 0, 1,
-            jobConf), basePath.getRoot().getPath(), 
Arrays.asList(logFilePath), newCommitTime);
-
-    //create a RecordReader to be used by HoodieRealtimeRecordReader
-    RecordReader<Void, ArrayWritable> reader =
-        new MapredParquetInputFormat().getRecordReader(
-            new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), 
(String[]) null),
-            jobConf, null);
-    JobConf jobConf = new JobConf();
-    List<Schema.Field> fields = schema.getFields();
-    String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
-    String postions = fields.stream().map(f -> String.valueOf(f.pos()))
-        .collect(Collectors.joining(","));
-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
-    if (partitioned) {
-      jobConf.set("partition_columns", "datestr");
-    }
+    List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
+    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
+    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
+    // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
+    // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
+    FileSlice fileSlice = new FileSlice(partitioned ? 
FSUtils.getRelativePartitionPath(new Path(
+        basePath.getRoot().getAbsolutePath()), new 
Path(partitionDir.getAbsolutePath())) : "default",
+        baseInstant, "fileid0");
+    logVersionsWithAction.stream().forEach(logVersionWithAction -> {
+      try {
+        // update files or generate new log file
+        int logVersion = logVersionWithAction.getRight();
+        String action = logVersionWithAction.getKey();
+        int baseInstantTs = Integer.parseInt(baseInstant);
+        String instantTime = String.valueOf(baseInstantTs + logVersion);
+        String latestInstant = action.equals(HoodieTimeline.ROLLBACK_ACTION)
+            ? String.valueOf(baseInstantTs + logVersion - 2) : instantTime;
+
+        HoodieLogFormat.Writer writer = null;
+        if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
+          writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
+              instantTime, String.valueOf(baseInstantTs + logVersion - 1), 
logVersion);
+        } else {
+          writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
+              instantTime, 100, 0, logVersion);
+        }
+        long size = writer.getCurrentSize();
+        writer.close();
+        assertTrue("block - size should be > 0", size > 0);
+
+        //create a split with baseFile (parquet file written earlier) and new 
log file(s)
+        fileSlice.addLogFile(writer.getLogFile());
+        HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
+            new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + 
baseInstant + ".parquet"), 0, 1,
+                jobConf), basePath.getRoot().getPath(),
+            
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(h -> 
h.getPath().toString())
+                .collect(Collectors.toList()), instantTime);
+
+        //create a RecordReader to be used by HoodieRealtimeRecordReader
+        RecordReader<Void, ArrayWritable> reader =
+            new MapredParquetInputFormat().getRecordReader(
+                new FileSplit(split.getPath(), 0, 
fs.getLength(split.getPath()), (String[]) null),
+                jobConf, null);
+        JobConf jobConf = new JobConf();
+        List<Schema.Field> fields = schema.getFields();
+        String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
+        String postions = fields.stream().map(f -> String.valueOf(f.pos()))
+            .collect(Collectors.joining(","));
+        jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+        jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
+        if (partitioned) {
+          jobConf.set("partition_columns", "datestr");
+        }
+
+        //validate record reader compaction
+        HoodieRealtimeRecordReader recordReader = new 
HoodieRealtimeRecordReader(split, jobConf, reader);
+
+        //use reader to read base Parquet File and log file, merge in flight 
and return latest commit
+        //here all 100 records should be updated, see above
+        Void key = recordReader.createKey();
+        ArrayWritable value = recordReader.createValue();
+        while (recordReader.next(key, value)) {
+          Writable[] values = value.get();
+          //check if the record written is with latest commit, here "101"
+          Assert.assertEquals(latestInstant, values[0].toString());
+          key = recordReader.createKey();
+          value = recordReader.createValue();
+        }
+      } catch (Exception ioe) {
+        throw new HoodieException(ioe.getMessage(), ioe);
+      }
+    });
 
-    //validate record reader compaction
-    HoodieRealtimeRecordReader recordReader = new 
HoodieRealtimeRecordReader(split, jobConf, reader);
+    // Add Rollback last version to next log-file
 
-    //use reader to read base Parquet File and log file, merge in flight and 
return latest commit
-    //here all 100 records should be updated, see above
-    Void key = recordReader.createKey();
-    ArrayWritable value = recordReader.createValue();
-    while (recordReader.next(key, value)) {
-      Writable[] values = value.get();
-      //check if the record written is with latest commit, here "101"
-      Assert.assertEquals(values[0].toString(), newCommitTime);
-      key = recordReader.createKey();
-      value = recordReader.createValue();
-    }
   }
 
   @Test
@@ -195,7 +256,7 @@ public class HoodieRealtimeRecordReaderTest {
     // insert new records to log file
     String newCommitTime = "101";
     HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, 
"fileid0", commitTime,
-        newCommitTime, numRecords, numRecords);
+        newCommitTime, numRecords, numRecords, 0);
     long size = writer.getCurrentSize();
     writer.close();
     assertTrue("block - size should be > 0", size > 0);

Reply via email to