danny0405 commented on code in PR #7627:
URL: https://github.com/apache/hudi/pull/7627#discussion_r1172399586


##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java:
##########
@@ -72,6 +72,14 @@ public class HoodieCommonConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Turn on compression for BITCASK disk map used by the 
External Spillable Map");
 
+  public static final ConfigProperty<Boolean> 
INCREMENTAL_FETCH_INSTANT_BY_STATE_TRANSITION_TIME = ConfigProperty
+      .key("hoodie.incremental.fetch.instant.by.state.transition.time")
+      .defaultValue(false)
+      .sinceVersion("0.14.0")
+      .withDocumentation("For incremental mode, whether to enable to pull 
commits in range by stateTransitionTime "
+          + "instead of commit timestamp. Please be aware that enabling this 
will result in"
+          + "`begin.instanttime` and `end.instanttime` using 
`stateTransitionTime` instead of the instant's commit time.");

Review Comment:
   ```java
   "For incremental mode, whether to enable to pulling commits in range by 
state transition time(completion time) "
             + "instead of commit time(start time). Please be aware that 
enabling this will result in"
             + "`begin.instanttime` and `end.instanttime` using 
`stateTransitionTime` instead of the instant's commit time."
   ```



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithStateTransitionTime.scala:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+class TestIncrementalReadWithStateTransitionTime extends 
HoodieSparkClientTestBase  {
+
+  var spark: SparkSession = null
+
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+  )
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testReadingWithStateTransitionTime(tableType: HoodieTableType): Unit = {
+    val records = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+      .option("hoodie.cleaner.commits.retained", "3")
+      .option("hoodie.keep.min.commits", "4")
+      .option("hoodie.keep.max.commits", "5")
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(spark.sparkContext.hadoopConfiguration)
+      .setBasePath(basePath)
+      .setLoadActiveTimelineOnLoad(true)
+      .build()
+
+    val firstInstant = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstantsOrderedByStateTransitionTs
+      .findFirst().get()
+
+    val result1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "000")
+      
.option(DataSourceReadOptions.INCREMENTAL_FETCH_INSTANT_BY_STATE_TRANSITION_TIME.key(),
 "true")
+      .option(DataSourceReadOptions.END_INSTANTTIME.key(), 
firstInstant.getTimestamp)
+      .load(basePath)

Review Comment:
   We also need to test the out of order scenarios, where the code would miss 
data without this change.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -226,6 +275,8 @@ public int compareTo(HoodieInstant o) {
 
   @Override
   public String toString() {
-    return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + 
"__" + action + "__" + state + "]";
+    return "[" + ((isInflight() || isRequested()) ? "==>" : "")
+        + timestamp + "__" + action + "__" + state
+        + (StringUtils.isNullOrEmpty(stateTransitionTime) ? "" : ("__" + 
stateTransitionTime)) + "]";
   }

Review Comment:
   Guess the check `StringUtils.isNullOrEmpty` is unnecessary because 
`stateTransitionTime` is by defaut an empty string.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -70,32 +100,38 @@ public enum State {
     NIL
   }
 
-  private State state = State.COMPLETED;
-  private String action;
-  private String timestamp;
+  private final State state;
+  private final String action;
+  private final String timestamp;
+  private final String stateTransitionTime;
 
   /**
    * Load the instant from the meta FileStatus.
    */
   public HoodieInstant(FileStatus fileStatus) {
     // First read the instant timestamp. [==>20170101193025<==].commit
     String fileName = fileStatus.getPath().getName();
-    String fileExtension = getTimelineFileExtension(fileName);
-    timestamp = fileName.replace(fileExtension, "");
-
-    // Next read the action for this marker
-    action = fileExtension.replaceFirst(".", "");
-    if (action.equals("inflight")) {
-      // This is to support backwards compatibility on how in-flight commit 
files were written
-      // General rule is inflight extension is .<action>.inflight, but for 
commit it is .inflight
-      action = "commit";
-      state = State.INFLIGHT;
-    } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
-      state = State.INFLIGHT;
-      action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
-    } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
-      state = State.REQUESTED;
-      action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      timestamp = matcher.group(1);
+      if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+        // This is to support backwards compatibility on how in-flight commit 
files were written
+        // General rule is inflight extension is .<action>.inflight, but for 
commit it is .inflight
+        action = "commit";
+        state = State.INFLIGHT;
+      } else {
+        action = matcher.group(2).replaceFirst(".", "");
+        if (matcher.groupCount() == 3 && matcher.group(3) != null) {

Review Comment:
   Can we replace all these hard code string constants with static members? 
i.e. `"."` and `""`.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java:
##########
@@ -72,6 +72,14 @@ public class HoodieCommonConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Turn on compression for BITCASK disk map used by the 
External Spillable Map");
 
+  public static final ConfigProperty<Boolean> 
INCREMENTAL_FETCH_INSTANT_BY_STATE_TRANSITION_TIME = ConfigProperty
+      .key("hoodie.incremental.fetch.instant.by.state.transition.time")
+      .defaultValue(false)

Review Comment:
   `hoodie.incremental.fetch.instant.by.state.transition.time` seem not a good 
name, does
   `hoodie.datasource.read.by.state.transition.time` make sense to you?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java:
##########
@@ -191,7 +192,8 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
         return null;
       });
     }
-    return new 
HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), 
action, instantTime);
+    return new 
HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), 
action,
+        instantTime, 
Option.ofNullable(record.get(STATE_TRANSITION_TIME)).orElse("").toString());
   }

Review Comment:
   Use static memner variable instead of hard code string.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -48,159 +48,176 @@ class TestStreamingSource extends StreamTest {
   }
 
   test("test cow stream source") {
-    withTempDir { inputDir =>
-      val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
-      HoodieTableMetaClient.withPropertyBuilder()

Review Comment:
   Can we just override the whole class and make the flag 
`DataSourceReadOptions.INCREMENTAL_FETCH_INSTANT_BY_STATE_TRANSITION_TIME` 
overridable?



##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.Assertions.assertStreamEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieInstant extends HoodieCommonTestHarness {
+
+  @Test
+  public void testExtractTimestamp() {
+    String fileName = "20230104152218702.inflight";
+    assertEquals("20230104152218702", 
HoodieInstant.extractTimestamp(fileName));
+
+    fileName = "20230104152218702.commit.request";
+    assertEquals("20230104152218702", 
HoodieInstant.extractTimestamp(fileName));
+  }
+
+  @Test
+  public void testGetTimelineFileExtension() {
+    String fileName = "20230104152218702.inflight";
+    assertEquals(".inflight", 
HoodieInstant.getTimelineFileExtension(fileName));
+
+    fileName = "20230104152218702.commit.request";
+    assertEquals(".commit.request", 
HoodieInstant.getTimelineFileExtension(fileName));
+  }
+
+  @Test
+  public void testCreateHoodieInstantByFileStatus() throws IOException {
+    try {
+      initMetaClient();
+      HoodieInstant instantRequested =
+          new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "001");
+      HoodieInstant instantCommitted =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "001");
+      HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+      timeline.createNewInstant(instantRequested);
+      timeline.transitionRequestedToInflight(instantRequested, Option.empty());
+      timeline.saveAsComplete(
+          new HoodieInstant(true, instantRequested.getAction(), 
instantRequested.getTimestamp()),
+          Option.empty());
+      metaClient.reloadActiveTimeline();

Review Comment:
   Can we also add a test when the file is moved, the state transition time 
also changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to