xushiyan commented on code in PR #7627:
URL: https://github.com/apache/hudi/pull/7627#discussion_r1197622388


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable, 
Comparable<HoodieInstant> {
   public static final Comparator<HoodieInstant> COMPARATOR = 
Comparator.comparing(HoodieInstant::getTimestamp)
       .thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
 
+  public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+      Comparator.comparing(HoodieInstant::getStateTransitionTime)
+          .thenComparing(COMPARATOR);
+
+  public static final String EMPTY_FILE_EXTENSION = "";
+
   public static String getComparableAction(String action) {
     return COMPARABLE_ACTIONS.getOrDefault(action, action);
   }
 
+  public static String extractTimestamp(String fileName) throws 
IllegalArgumentException {
+    Objects.requireNonNull(fileName);

Review Comment:
   arg should be nonnull by convention so this is redundant. we only annotate 
it if it's nullable



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -51,45 +53,46 @@ class TestStreamingSource extends StreamTest {
     withTempDir { inputDir =>
       val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
       HoodieTableMetaClient.withPropertyBuilder()
-          .setTableType(COPY_ON_WRITE)
-          .setTableName(getTableName(tablePath))
-          
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setTableType(COPY_ON_WRITE)
+        .setTableName(getTableName(tablePath))
+        
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
-          .initTable(spark.sessionState.newHadoopConf(), tablePath)
+        .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
       val df = spark.readStream
         .format("org.apache.hudi")
+        .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), 
useTransitionTime)

Review Comment:
   this should be parameterized, otherwise, the default commit instant time is 
not test-covered



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -70,32 +105,38 @@ public enum State {
     NIL
   }
 
-  private State state = State.COMPLETED;
-  private String action;
-  private String timestamp;
+  private final State state;
+  private final String action;
+  private final String timestamp;
+  private final String stateTransitionTime;
 
   /**
    * Load the instant from the meta FileStatus.
    */
   public HoodieInstant(FileStatus fileStatus) {
     // First read the instant timestamp. [==>20170101193025<==].commit
     String fileName = fileStatus.getPath().getName();
-    String fileExtension = getTimelineFileExtension(fileName);
-    timestamp = fileName.replace(fileExtension, "");
-
-    // Next read the action for this marker
-    action = fileExtension.replaceFirst(".", "");
-    if (action.equals("inflight")) {
-      // This is to support backwards compatibility on how in-flight commit 
files were written
-      // General rule is inflight extension is .<action>.inflight, but for 
commit it is .inflight
-      action = "commit";
-      state = State.INFLIGHT;
-    } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
-      state = State.INFLIGHT;
-      action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
-    } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
-      state = State.REQUESTED;
-      action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      timestamp = matcher.group(1);
+      if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+        // This is to support backwards compatibility on how in-flight commit 
files were written
+        // General rule is inflight extension is .<action>.inflight, but for 
commit it is .inflight
+        action = HoodieTimeline.COMMIT_ACTION;
+        state = State.INFLIGHT;
+      } else {
+        action = matcher.group(2).replaceFirst(DELIMITER, 
StringUtils.EMPTY_STRING);
+        if (matcher.groupCount() == 3 && matcher.group(3) != null) {
+          state = State.valueOf(matcher.group(3).replaceFirst(DELIMITER, 
StringUtils.EMPTY_STRING).toUpperCase());
+        } else {
+          // Like 20230104152218702.commit
+          state = State.COMPLETED;
+        }
+      }
+      stateTransitionTime =
+          HoodieInstantTimeGenerator.formatDate(new 
Date(fileStatus.getModificationTime()));
+    } else {
+      throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, 
fileName));

Review Comment:
   same here. should have a hudi specific exception



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable, 
Comparable<HoodieInstant> {
   public static final Comparator<HoodieInstant> COMPARATOR = 
Comparator.comparing(HoodieInstant::getTimestamp)
       .thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
 
+  public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+      Comparator.comparing(HoodieInstant::getStateTransitionTime)
+          .thenComparing(COMPARATOR);
+
+  public static final String EMPTY_FILE_EXTENSION = "";
+
   public static String getComparableAction(String action) {
     return COMPARABLE_ACTIONS.getOrDefault(action, action);
   }
 
+  public static String extractTimestamp(String fileName) throws 
IllegalArgumentException {
+    Objects.requireNonNull(fileName);
+
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      return matcher.group(1);
+    }
+
+    throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, 
fileName));

Review Comment:
   should use a hudi specific exception to give more context



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -197,6 +197,13 @@ public HoodieDefaultTimeline findInstantsInRange(String 
startTs, String endTs) {
         getInstantsAsStream().filter(s -> 
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTs(String 
startTs, String endTs) {

Review Comment:
   should just call it `findInstantsInRangeByStateTransitionTime`. let's avoid 
having acronym in the api



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to