danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725764091



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in Path to track matching log file 
and base files.
+ * Hence, this weird looking class which tracks an log/base file status
+ */
+public class PathWithLogFilePath extends Path {
+  // a flag to mark this split is produced by incremental query or not.
+  private boolean belongToIncrementalPath = false;
+  // the log files belong this path.
+  private List<String> deltaLogPaths = new ArrayList<>();
+  // max commit time of current path.
+  private String maxCommitTime = "";
+  // the basePath of current hoodie table.
+  private String basePath = "";
+  // the base file belong to this path;
+  private String baseFilePath = "";

Review comment:
       the base file path belong to this path.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseFileWithLogsSplit extends FileSplit {

Review comment:
       `We need to encode additional information in split to track matching 
base and log files. Hence, this weird looking class which tracks a log/base 
file split`

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -102,97 +102,107 @@ private HoodieMergedLogRecordScanner 
getMergedLogRecordScanner() throws IOExcept
 
   @Override
   public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws 
IOException {
+    // deal with DeltaOnlySplits
+    if (logReader.isPresent()) {
+      return logReader.get().next(aVoid, arrayWritable);
+    }
     // Call the underlying parquetReader.next - which may replace the passed 
in ArrayWritable
     // with a new block of values
-    while (this.parquetReader.next(aVoid, arrayWritable)) {
-      if (!deltaRecordMap.isEmpty()) {
-        String key = arrayWritable.get()[recordKeyIndex].toString();
-        if (deltaRecordMap.containsKey(key)) {
-          // mark the key as handled
-          this.deltaRecordKeys.remove(key);
-          // TODO(NA): Invoke preCombine here by converting arrayWritable to 
Avro. This is required since the
-          // deltaRecord may not be a full record and needs values of columns 
from the parquet
-          Option<GenericRecord> rec = 
buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
-          // If the record is not present, this is a delete record using an 
empty payload so skip this base record
-          // and move to the next record
-          if (!rec.isPresent()) {
-            continue;
+    boolean result = this.parquetReader.next(aVoid, arrayWritable);
+    if (!result) {
+      // if the result is false, then there are no more records
+      return false;
+    }
+    if (!deltaRecordMap.isEmpty()) {
+      // TODO(VC): Right now, we assume all records in log, have a matching 
base record. (which
+      // would be true until we have a way to index logs too)

Review comment:
       No only hbase index, the flink MOR table can also index log files, we 
need to support that: pure log file groups and base + log files file group.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = 
HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> 
affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> 
fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, 
p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> 
slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = 
HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " 
+ baseFilePath);
+          }
+          RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath);
+          fileStatus.setBaseFilePath(baseFilePath);
+          
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));
+          // try to set bootstrapfileStatus
+          if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile 
|| baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+            fileStatus.setBootStrapFileStatus(baseFileStatus);
+          }
+          result.add(fileStatus);
+        }
+        // add file group which has only logs.
+        if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
+          List<FileStatus> logFileStatus = 
f.getLatestFileSlice().get().getLogFiles().map(logFile -> 
logFile.getFileStatus()).collect(Collectors.toList());
+          if (logFileStatus.size() > 0) {
+            RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(logFileStatus.get(0));
+            fileStatus.setBelongToIncrementalFileStatus(true);
+            fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));
+            fileStatus.setMaxCommitTime(maxCommitTime);
+            fileStatus.setBasePath(basePath);
+            result.add(fileStatus);
+          }
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Error obtaining data file/log file grouping 
", e);
+      }
+    });
+    return result;
+  }
+
+  @Override
+  protected boolean includeLogFilesForSnapShotView() {
+    return true;
+  }
+
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    if (filename instanceof PathWithLogFilePath) {
+      return ((PathWithLogFilePath)filename).splitable();
+    }
+    return super.isSplitable(fs, filename);
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] 
hosts) {
+    if (file instanceof PathWithLogFilePath) {
+      return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, 
start, length, hosts, null);
+    }
+    return super.makeSplit(file, start, length, hosts);
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] 
hosts, String[] inMemoryHosts) {
+    if (file instanceof PathWithLogFilePath) {

Review comment:
       Okey, please add the explanation to the code comments.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieEmptyRecordReader.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+public class HoodieEmptyRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
+  @Override

Review comment:
       Clazz documents.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> 
inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = 
HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> 
affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> 
fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, 
p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> 
slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = 
HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " 
+ baseFilePath);
+          }
+          RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath);
+          fileStatus.setBaseFilePath(baseFilePath);
+          
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));

Review comment:
       Also confused by this logic, why all the file slices here and why we 
take the first base file then ?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in Path to track matching log file 
and base files.
+ * Hence, this weird looking class which tracks an log/base file status
+ */
+public class PathWithLogFilePath extends Path {
+
+  private boolean belongToIncrementalPath = false;
+  private List<String> deltaLogPaths = new ArrayList<>();

Review comment:
       Encode additional information in Path to track matching base and log 
files.
   Hence, this clazz tracks both base and log file statuses.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in split to track matching log 
file and base files.
+ * Hence, this weird looking class which tracks an log/base file split
+ */
+public class BaseFileWithLogsSplit extends FileSplit {
+  // a flag to mark this split is produced by incremental query or not.
+  private boolean belongToIncrementalSplit = false;
+  // the log files of this split.
+  private List<String> deltaLogPaths = new ArrayList<>();

Review comment:
       `the log files of this split.` => `the log file paths of this split.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to