vinothchandar commented on code in PR #12342:
URL: https://github.com/apache/hudi/pull/12342#discussion_r1864651906


##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Class for representing checkpoint
+ */
+public abstract class Checkpoint implements Serializable {

Review Comment:
   rename to `AbstractCheckpoint` ?.. if you intend to use this as an 
interface, pull stuff into an interface?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointV2.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+
+public class CheckpointV2 extends Checkpoint {
+  // TODO(yihua): decouple the keys for Hudi Streamer
+  public static final String STREAMER_CHECKPOINT_KEY_V2 = 
"streamer.checkpoint.key.v2";

Review Comment:
   do we need the `v2` given we already change `deltastreamer` -> `streamer` ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -111,7 +112,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: 
SQLContext,
     val optionalFilters = filters
     val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
-    new HoodieMergeOnReadRDD(
+    new HoodieMergeOnReadRDDV2(

Review Comment:
   specifically stuff like this where checkpoint affects sth we use for 
snapshot query. is a side effect to try and avoid



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointV2.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+
+public class CheckpointV2 extends Checkpoint {
+  // TODO(yihua): decouple the keys for Hudi Streamer
+  public static final String STREAMER_CHECKPOINT_KEY_V2 = 
"streamer.checkpoint.key.v2";

Review Comment:
   not a big deal. just thinking we may not need a translation like this in a 
while. why leak `v2` to the world



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+
+public class CheckpointUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointUtils.class);
+
+  public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
+      return new CheckpointV2(commitMetadata);
+    }
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V1))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V1)))
 {
+      return new CheckpointV1(commitMetadata);
+    }
+    throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
+  }
+
+  public static boolean targetCheckpointV2(int writeTableVersion) {

Review Comment:
   rename: doesTargetUseCheckpointV2() 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+
+public class CheckpointUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointUtils.class);
+
+  public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
+      return new CheckpointV2(commitMetadata);
+    }
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V1))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V1)))
 {
+      return new CheckpointV1(commitMetadata);
+    }
+    throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
+  }
+
+  public static boolean targetCheckpointV2(int writeTableVersion) {
+    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
+  }
+
+  // TODO(yihua): for checkpoint translation, handle cases where the 
checkpoint is not exactly the
+  // instant or completion time
+  public static CheckpointV2 convertToCheckpointV2ForCommitTime(
+      Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint instanceof CheckpointV2) {
+      return (CheckpointV2) checkpoint;
+    }
+    if (checkpoint instanceof CheckpointV1) {
+      // V1 -> V2 translation
+      // TODO(yihua): handle USE_TRANSITION_TIME in V1
+      // TODO(yihua): handle different ordering between requested and 
completion time
+      // TODO(yihua): handle timeline history / archived timeline
+      String instantTime = checkpoint.getCheckpointKey();
+      String completionTime = metaClient.getActiveTimeline()

Review Comment:
   isn't there an easier method to get the HoodieInstant 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+
+public class CheckpointUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointUtils.class);
+
+  public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
+      return new CheckpointV2(commitMetadata);
+    }
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V1))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V1)))
 {
+      return new CheckpointV1(commitMetadata);
+    }
+    throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
+  }
+
+  public static boolean targetCheckpointV2(int writeTableVersion) {
+    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
+  }
+
+  // TODO(yihua): for checkpoint translation, handle cases where the 
checkpoint is not exactly the
+  // instant or completion time
+  public static CheckpointV2 convertToCheckpointV2ForCommitTime(
+      Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint instanceof CheckpointV2) {
+      return (CheckpointV2) checkpoint;
+    }
+    if (checkpoint instanceof CheckpointV1) {

Review Comment:
   do we want to do a `Checkpoint#getVersion()` instead of a class to determine 
version? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointV1.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CheckpointV1 extends Checkpoint {
+  // TODO(yihua): decouple the keys for Hudi Streamer
+  public static final String STREAMER_CHECKPOINT_KEY_V1 = 
"deltastreamer.checkpoint.key";
+  public static final String STREAMER_CHECKPOINT_RESET_KEY_V1 = 
"deltastreamer.checkpoint.reset_key";
+
+  public CheckpointV1(String key) {

Review Comment:
   assert for nulls ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -120,6 +120,11 @@ object DataSourceReadOptions {
       + "completion_time <= END_COMMIT are fetched out. "
       + "Point in time type queries make more sense with begin and end 
completion times specified.")
 
+  val INCREMENTAL_READ_VERSION: ConfigProperty[String] = ConfigProperty

Review Comment:
   `INCREMENTAL_READ_TABLE_VERSION`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+
+public class CheckpointUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointUtils.class);
+
+  public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
+      return new CheckpointV2(commitMetadata);
+    }
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V1))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V1)))
 {
+      return new CheckpointV1(commitMetadata);
+    }
+    throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
+  }
+
+  public static boolean targetCheckpointV2(int writeTableVersion) {
+    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
+  }
+
+  // TODO(yihua): for checkpoint translation, handle cases where the 
checkpoint is not exactly the
+  // instant or completion time
+  public static CheckpointV2 convertToCheckpointV2ForCommitTime(
+      Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint instanceof CheckpointV2) {
+      return (CheckpointV2) checkpoint;
+    }
+    if (checkpoint instanceof CheckpointV1) {
+      // V1 -> V2 translation
+      // TODO(yihua): handle USE_TRANSITION_TIME in V1
+      // TODO(yihua): handle different ordering between requested and 
completion time
+      // TODO(yihua): handle timeline history / archived timeline
+      String instantTime = checkpoint.getCheckpointKey();
+      String completionTime = metaClient.getActiveTimeline()
+          .getInstantsAsStream()
+          .filter(s -> instantTime.equals(s.requestedTime()))
+          .map(HoodieInstant::getCompletionTime)
+          .findFirst().orElse(null);
+      if (completionTime == null) {

Review Comment:
   use `Option` vs null



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointV1.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CheckpointV1 extends Checkpoint {
+  // TODO(yihua): decouple the keys for Hudi Streamer
+  public static final String STREAMER_CHECKPOINT_KEY_V1 = 
"deltastreamer.checkpoint.key";

Review Comment:
   not a fan of having streamer configs here in `hudi-common`.. Should we just 
have Checkpoint abstract class here.. and move others close to their own 
packages..



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
+
+public class CheckpointUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointUtils.class);
+
+  public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
+      return new CheckpointV2(commitMetadata);
+    }
+    if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V1))
+        || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V1)))
 {
+      return new CheckpointV1(commitMetadata);
+    }
+    throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
+  }
+
+  public static boolean targetCheckpointV2(int writeTableVersion) {
+    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
+  }
+
+  // TODO(yihua): for checkpoint translation, handle cases where the 
checkpoint is not exactly the
+  // instant or completion time
+  public static CheckpointV2 convertToCheckpointV2ForCommitTime(
+      Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint instanceof CheckpointV2) {
+      return (CheckpointV2) checkpoint;
+    }
+    if (checkpoint instanceof CheckpointV1) {
+      // V1 -> V2 translation
+      // TODO(yihua): handle USE_TRANSITION_TIME in V1
+      // TODO(yihua): handle different ordering between requested and 
completion time
+      // TODO(yihua): handle timeline history / archived timeline
+      String instantTime = checkpoint.getCheckpointKey();
+      String completionTime = metaClient.getActiveTimeline()
+          .getInstantsAsStream()
+          .filter(s -> instantTime.equals(s.requestedTime()))
+          .map(HoodieInstant::getCompletionTime)
+          .findFirst().orElse(null);
+      if (completionTime == null) {
+        throw new UnsupportedOperationException("Unable to find completion 
time for " + instantTime);
+      }
+      return new CheckpointV2(completionTime);
+    }
+    throw new UnsupportedOperationException("Unsupported checkpoint type: " + 
checkpoint.getClass());

Review Comment:
   can change to checkpoint.getVersion()



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -301,11 +302,29 @@ object DefaultSource {
               resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (useNewParquetFileFormat) {
-              new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_VERSION)) {
+              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_VERSION.key))
+              if (writeTableVersion >= 8) {

Review Comment:
   use HoodieTableVersion.EIGHT. (here and everywhere)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -120,6 +120,11 @@ object DataSourceReadOptions {
       + "completion_time <= END_COMMIT are fetched out. "
       + "Point in time type queries make more sense with begin and end 
completion times specified.")
 
+  val INCREMENTAL_READ_VERSION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.read.incremental.version")

Review Comment:
   `hoodie.datasource.read.incremental.version` -> 
`hoodie.datasource.read.incr.table.version` consistent with other cfgs



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointV2.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
+
+public class CheckpointV2 extends Checkpoint {
+  // TODO(yihua): decouple the keys for Hudi Streamer
+  public static final String STREAMER_CHECKPOINT_KEY_V2 = 
"streamer.checkpoint.key.v2";
+  public static final String STREAMER_CHECKPOINT_RESET_KEY_V2 = 
"streamer.checkpoint.reset.key.v2";
+
+  public CheckpointV2(String key) {
+    this.checkpointKey = key;
+  }
+
+  public CheckpointV2(Checkpoint checkpoint) {
+    this.checkpointKey = checkpoint.getCheckpointKey();
+    this.checkpointResetKey = checkpoint.getCheckpointResetKey();
+    this.checkpointIgnoreKey = checkpoint.getCheckpointIgnoreKey();
+  }
+
+  public CheckpointV2(HoodieCommitMetadata commitMetadata) {
+    this.checkpointKey = 
commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2);
+    this.checkpointResetKey = 
commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2);
+    this.checkpointIgnoreKey = 
commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY);
+  }
+
+  public void addV1Props() {
+    this.extraProps.put(STREAMER_CHECKPOINT_KEY_V1, checkpointKey);
+    this.extraProps.put(STREAMER_CHECKPOINT_RESET_KEY_V1, checkpointResetKey);
+  }
+
+  @Override
+  public Map<String, String> getCheckpointCommitMetadata(String 
overrideResetKey,
+                                                         String 
overrideIgnoreKey) {
+    Map<String, String> checkpointCommitMetadata = new HashMap<>();

Review Comment:
   share code between this and the other method in CheckpointV1 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -301,11 +302,29 @@ object DefaultSource {
               resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (useNewParquetFileFormat) {
-              new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_VERSION)) {
+              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_VERSION.key))
+              if (writeTableVersion >= 8) {
+                if (useNewParquetFileFormat) {
+                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+                } else {
+                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
+                }
+              } else {
+                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
+              }
             } else {
-              new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+              if (metaClient.getTableConfig.getTableVersion.versionCode() >= 
8) {

Review Comment:
   can we simplify all this to just a single if-else switch. Looks like we just 
need a couple more boolean flags based on whether its get or not.. 
   
   ```
   val tableVersion = if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_VERSION) {
                Integer.parseInt(parameters(INCREMENTAL_READ_VERSION.key) 
           } else {
                metaClient.getTableConfig.getTableVersion
           }
           
    ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -301,11 +302,29 @@ object DefaultSource {
               resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (useNewParquetFileFormat) {
-              new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_VERSION)) {
+              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_VERSION.key))
+              if (writeTableVersion >= 8) {
+                if (useNewParquetFileFormat) {
+                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+                } else {
+                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
+                }
+              } else {
+                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
+              }
             } else {
-              new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+              if (metaClient.getTableConfig.getTableVersion.versionCode() >= 
8) {
+                if (useNewParquetFileFormat) {
+                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+                } else {
+                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)

Review Comment:
   do we really need completely different relations based on checkpoint 
version.. I feel just passing in the checkpoint version to use is sufficient.. 
I worry there are too many v1 V2 classes to maintain here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java:
##########
@@ -254,22 +257,23 @@ private Dataset<Row> fullFetch(long sourceLimit) {
     return validatePropsAndGetDataFrameReader(sparkSession, 
props).option(Config.RDBMS_TABLE_PROP, query).load();
   }
 
-  private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental, 
Option<String> lastCkptStr) {
+  private Checkpoint checkpoint(Dataset<Row> rowDataset, boolean 
isIncremental, Option<Checkpoint> lastCheckpoint) {
     try {
       if (isIncremental) {
         Column incrementalColumn = rowDataset.col(getStringWithAltKeys(props, 
JdbcSourceConfig.INCREMENTAL_COLUMN));
         final String max = 
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
         LOG.info(String.format("Checkpointing column %s with value: %s ", 
incrementalColumn, max));
         if (max != null) {
-          return max;
+          return new CheckpointV2(max);

Review Comment:
   can we introduce a factory.. `Checkpoint.create(..)` that will create the 
current/latest by default.. then we avoid use of version specific 
`CheckpointV2` and CheckpointV1` classes in regular code.. There should be a 
few places where we will handle CheckpointV1 and thats it



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME
+import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
+import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, 
HoodieRecord, HoodieReplaceCommitMetadata}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.{handleHollowCommitIfNeeded,
 HollowCommitHandling}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
+import org.apache.hudi.common.util.{HoodieTimer, InternalSchemaCache}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieException, 
HoodieIncrementalPathNotFoundException}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
+import org.apache.hudi.table.HoodieSparkTable
+
+import org.apache.avro.Schema
+import org.apache.hadoop.fs.GlobPattern
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
+import 
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Relation, that implements the Hoodie incremental view.
+ *
+ * Implemented for Copy_on_write storage.
+ * TODO: rebase w/ HoodieBaseRelation HUDI-5362
+ *
+ */
+// TODO(yihua): revisit all #requestedTime calls for USE_TRANSITION_TIME mode
+class IncrementalRelationV1(val sqlContext: SQLContext,

Review Comment:
   is this fresh code or copied over from 0.x?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -48,6 +58,7 @@ public enum SourceType {
   protected transient JavaSparkContext sparkContext;
   protected transient SparkSession sparkSession;
   protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
+  protected int writeTableVersion;

Review Comment:
   use `HoodieTableVersion`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline, 
getCommitMetadata, handleHollowCommitIfNeeded, HollowCommitHandling}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.HoodieException
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
+import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths
+import org.apache.hudi.storage.StoragePathInfo
+
+import org.apache.hadoop.fs.GlobPattern
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+/**
+ * @Experimental
+ */
+case class MergeOnReadIncrementalRelationV1(override val sqlContext: 
SQLContext,

Review Comment:
   same question. is this just the 0.x code copied over..



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -283,7 +312,107 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       metricsOption.ifPresent(metrics -> 
metrics.updateStreamerSourceParallelism(sourceProfile.getSourcePartitions()));
       return coalesceOrRepartition(sourceWithMetaColumnsDropped, 
sourceProfile.getSourcePartitions());
     }).orElse(sourceWithMetaColumnsDropped);
-    return Pair.of(Option.of(src), endCompletionTime);
+    return Pair.of(Option.of(src), new CheckpointV2(endCompletionTime));
+  }
+
+  private Pair<Option<Dataset<Row>>, Checkpoint> 
fetchNextBatchBasedOnRequestedTime(

Review Comment:
   lots of code copied over seems like between these two methods?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -107,25 +112,30 @@ public static QueryInfo 
generateQueryInfo(JavaSparkContext jssc, String srcBaseP
         
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
         .setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
 
+    // TODO(yihua): handle transition time in CheckpointV1

Review Comment:
   what does this entail



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroSource.java:
##########
@@ -32,4 +33,9 @@ public AvroSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSes
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO);
   }
+
+  @Override
+  protected final InputBatch<JavaRDD<GenericRecord>> 
fetchNewData(Option<String> lastCkptStr, long sourceLimit) {

Review Comment:
   this is an abstract class. so why the need to implement this - vs forcing 
subclasses to deal with it



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -69,20 +80,63 @@ protected Source(TypedProperties props, JavaSparkContext 
sparkContext, SparkSess
     this.overriddenSchemaProvider = streamContext.getSchemaProvider();
     this.sourceType = sourceType;
     this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
+    this.writeTableVersion = ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION);
   }
 
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   protected abstract InputBatch<T> fetchNewData(Option<String> lastCkptStr, 
long sourceLimit);
 
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  protected InputBatch<T> fetchNewDataFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {

Review Comment:
   rename to simply `readFromCheckpoint()`  ?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -69,20 +80,63 @@ protected Source(TypedProperties props, JavaSparkContext 
sparkContext, SparkSess
     this.overriddenSchemaProvider = streamContext.getSchemaProvider();
     this.sourceType = sourceType;
     this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
+    this.writeTableVersion = ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION);
   }
 
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   protected abstract InputBatch<T> fetchNewData(Option<String> lastCkptStr, 
long sourceLimit);
 
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  protected InputBatch<T> fetchNewDataFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    LOG.info("In Hudi 1.0+, the checkpoint based on Hudi timeline is changed. "

Review Comment:
   WARN



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -116,7 +117,7 @@ public class HoodieStreamer implements Serializable {
   private static final String SENSITIVE_VALUES_MASKED = 
"SENSITIVE_INFO_MASKED";
 
   public static final String CHECKPOINT_KEY = 
HoodieWriteConfig.STREAMER_CHECKPOINT_KEY;
-  public static final String CHECKPOINT_RESET_KEY = 
"deltastreamer.checkpoint.reset_key";
+  public static final String CHECKPOINT_RESET_KEY = 
STREAMER_CHECKPOINT_RESET_KEY_V1;

Review Comment:
   should nt this refer to the variable in HoodieWriteConfig (as the comment 
says) to keep things consistent if needed
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -69,20 +80,63 @@ protected Source(TypedProperties props, JavaSparkContext 
sparkContext, SparkSess
     this.overriddenSchemaProvider = streamContext.getSchemaProvider();
     this.sourceType = sourceType;
     this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
+    this.writeTableVersion = ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION);
   }
 
+  @Deprecated
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   protected abstract InputBatch<T> fetchNewData(Option<String> lastCkptStr, 
long sourceLimit);
 
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  protected InputBatch<T> fetchNewDataFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    LOG.info("In Hudi 1.0+, the checkpoint based on Hudi timeline is changed. "
+        + "If your Source implementation relies on request time as the 
checkpoint, "
+        + "you may consider migrating to completion time-based checkpoint by 
overriding "
+        + "Source#translateCheckpoint and Source#fetchNewDataFromCheckpoint");
+    return fetchNewData(
+        lastCheckpoint.isPresent()
+            ? Option.of(lastCheckpoint.get().getCheckpointKey()) : 
Option.empty(),
+        sourceLimit);
+  }
+
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> 
lastCheckpoint) {

Review Comment:
   UT this?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -781,91 +765,6 @@ private JavaRDD<GenericRecord> 
getTransformedRDD(Dataset<Row> rowDataset, boolea
         Option.ofNullable(readerSchema)).toJavaRDD();
   }
 
-  /**
-   * Process previous commit metadata and checkpoint configs set by user to 
determine the checkpoint to resume from.
-   *
-   * @param commitsTimelineOpt commits timeline of interest, including .commit 
and .deltacommit.
-   * @return the checkpoint to resume from if applicable.
-   * @throws IOException
-   */
-  @VisibleForTesting
-  Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitsTimelineOpt) throws IOException {

Review Comment:
   Is this all largely just moved over to the Utils class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to