codope commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r739354289



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -161,6 +148,82 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) throws IOException {
+    List<InputSplit> rtSplits = new ArrayList<>();
+    List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
+    Set<Path> partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+    Map<Path, HoodieTableMetaClient> partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+    // Pre process tableConfig from first partition to fetch virtual key info
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
+    if (partitionSet.size() > 0) {
+      hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+    }
+    Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+    fileSplitList.stream().forEach(s -> {
+      // deal with incremental query.
+      try {
+        if (s instanceof BaseFileWithLogsSplit) {
+          BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+          if (bs.getBelongToIncrementalSplit()) {
+            rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+          }
+        } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+          rtSplits.add(s);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+      }
+    });
+    LOG.info("Returning a total splits of " + rtSplits.size());
+    return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option<HoodieVirtualKeyInfo> 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    if (!tableConfig.populateMetaFields()) {
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      try {
+        MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+        return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+            tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+            parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
+      } catch (Exception exception) {
+        throw new HoodieException("Fetching table schema failed with exception 
", exception);
+      }
+    }
+    return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List<FileSplit> fileSplits) {
+    if (fileSplits == null || fileSplits.size() == 0) {
+      return false;
+    }
+    return fileSplits.stream().anyMatch(s -> {
+      if (s instanceof BaseFileWithLogsSplit) {
+        BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+        return bs.getBelongToIncrementalSplit();
+      } else {
+        return s instanceof RealtimeBootstrapBaseFileSplit;
+      }
+    });
+  }
+
+  public static RealtimeBootstrapBaseFileSplit 
createRealimeBootstrapBaseFileSplit(

Review comment:
       minor: rename to createReal**t**imeBootstrapBaseFileSplit

##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
##########
@@ -535,4 +548,211 @@ public void 
testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
     );
   }
+
+  @Test
+  public void testIncremetalWithOnlylog() throws Exception {
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
+    String instantTime = "100";
+    final int numRecords = 1000;
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, numRecords, instantTime,
+        HoodieTableType.MERGE_ON_READ);
+    //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
+    createDeltaCommitFile(basePath, instantTime,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
+    // Add the paths
+    FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
+
+    // insert new records to log file
+    try {
+      String newCommitTime = "102";
+      HoodieLogFormat.Writer writer =
+          InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid0", instantTime, newCommitTime,
+              numRecords, numRecords, 0);
+      writer.close();
+      createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", 
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0");
+
+      InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
+
+      HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+      inputFormat.setConf(baseJobConf);
+      InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
+      assertTrue(splits.length == 1);
+      JobConf newJobConf = new JobConf(baseJobConf);
+      List<Schema.Field> fields = schema.getFields();
+      setHiveColumnNameProps(fields, newJobConf, false);
+      RecordReader<NullWritable, ArrayWritable> reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+      // use reader to read log file.
+      NullWritable key = reader.createKey();
+      ArrayWritable value = reader.createValue();
+      while (reader.next(key, value)) {
+        Writable[] values = value.get();
+        // check if the record written is with latest commit, here "101"

Review comment:
       minor: let's fix the comment here and #L640. Latest commit time may not 
necessarily be "101" always. Like, here it is "102" the newCommitTime and in 
the test below it should be "100". More importantly, for the person reading the 
code, it's better to make it clear why latest commit time should be "100" or 
any other value that is expected in the assertion.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-    Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+    List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+    boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+    return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *        a. Filter affected partitions based on inputPaths.
+   *        b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *        this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *        the BaseFileStatus will missing file size information.
+   *        We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *        Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *        and construct RealTimeFileStatus and add it to result along with 
log files.
+   *        If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    // step1
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    // step2
+    commitsToCheck.get().sort(HoodieInstant::compareTo);
+    List<HoodieCommitMetadata> metadataList = commitsToCheck
+        .get().stream().map(instant -> {
+          try {
+            return HoodieInputFormatUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
+          } catch (IOException e) {
+            throw new HoodieException(String.format("cannot get metadata for 
instant: %s", instant));
+          }
+        }).collect(Collectors.toList());
+
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
metadataList));
+    // step3
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    Path basePath = new Path(tableMetaClient.getBasePath());
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = 
HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()

Review comment:
       This can be a costly operation as we need to take read lock every time. 
Will caching file groups by partition be helpful? 

##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
##########
@@ -535,4 +548,211 @@ public void 
testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
     );
   }
+
+  @Test
+  public void testIncremetalWithOnlylog() throws Exception {

Review comment:
       minor: rename to testIncreme**n**talWithOnlylog (incremental spelling 
here and at #L602)

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-    Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+    List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+    boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+    return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *        a. Filter affected partitions based on inputPaths.
+   *        b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *        this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *        the BaseFileStatus will missing file size information.
+   *        We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *        Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *        and construct RealTimeFileStatus and add it to result along with 
log files.
+   *        If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    // step1
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);

Review comment:
       Can we reuse `HoodieInputFormatUtils#getCommitsForIncrementalQuery`?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-    Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+    List<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+    boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+    return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *        a. Filter affected partitions based on inputPaths.
+   *        b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *        this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *        the BaseFileStatus will missing file size information.
+   *        We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *        Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *        and construct RealTimeFileStatus and add it to result along with 
log files.
+   *        If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    // step1
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    // step2
+    commitsToCheck.get().sort(HoodieInstant::compareTo);
+    List<HoodieCommitMetadata> metadataList = commitsToCheck
+        .get().stream().map(instant -> {
+          try {
+            return HoodieInputFormatUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
+          } catch (IOException e) {
+            throw new HoodieException(String.format("cannot get metadata for 
instant: %s", instant));
+          }
+        }).collect(Collectors.toList());
+
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
metadataList));
+    // step3
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    Path basePath = new Path(tableMetaClient.getBasePath());
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = 
HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> 
fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    // step4
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, 
p).toString()).collect(Collectors.joining(",")));
+
+    // step5
+    // find all file status in partitionPaths.
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    // step6
+    result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, 
basePath.toString(), candidateFileStatus));
+    return result;
+  }
+
+  private List<FileStatus> collectAllIncrementalFiles(List<HoodieFileGroup> 
fileGroups, String maxCommitTime, String basePath, Map<String, FileStatus> 
candidateFileStatus) {
+    List<FileStatus> result = new ArrayList<>();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> 
slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = 
HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " 
+ baseFilePath);
+          }
+          // We cannot use baseFileStatus.getPath() here, since 
baseFileStatus.getPath() missing file size information.
+          // So we use candidateFileStatus.get(baseFileStatus.getPath()) to 
get a correct path.
+          RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath.toString());

Review comment:
       `toString()` is redundant as `basePath` is already a string. Same on 
#L211.

##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
##########
@@ -535,4 +548,211 @@ public void 
testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
     );
   }
+
+  @Test
+  public void testIncremetalWithOnlylog() throws Exception {
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
+    String instantTime = "100";
+    final int numRecords = 1000;
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, numRecords, instantTime,
+        HoodieTableType.MERGE_ON_READ);
+    //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
+    createDeltaCommitFile(basePath, instantTime,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
+    // Add the paths
+    FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
+
+    // insert new records to log file
+    try {
+      String newCommitTime = "102";
+      HoodieLogFormat.Writer writer =
+          InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid0", instantTime, newCommitTime,
+              numRecords, numRecords, 0);
+      writer.close();
+      createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", 
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0");
+
+      InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
+
+      HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+      inputFormat.setConf(baseJobConf);
+      InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
+      assertTrue(splits.length == 1);
+      JobConf newJobConf = new JobConf(baseJobConf);
+      List<Schema.Field> fields = schema.getFields();
+      setHiveColumnNameProps(fields, newJobConf, false);
+      RecordReader<NullWritable, ArrayWritable> reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+      // use reader to read log file.
+      NullWritable key = reader.createKey();
+      ArrayWritable value = reader.createValue();
+      while (reader.next(key, value)) {
+        Writable[] values = value.get();
+        // check if the record written is with latest commit, here "101"
+        assertEquals(newCommitTime, values[0].toString());
+        key = reader.createKey();
+        value = reader.createValue();
+      }
+      reader.close();
+    } catch (IOException e) {
+      throw new HoodieException(e.getMessage(), e);
+    }
+  }
+
+  @Test
+  public void testIncremetalWithReplace() throws Exception {

Review comment:
       +1 for this test.
   Shall we also validate the record reader with compaction? In that case, the 
latest commit time should match with compaction commit right. Is that already 
covered in these tests?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to