xushiyan commented on code in PR #7627:
URL: https://github.com/apache/hudi/pull/7627#discussion_r1197622388
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
public static final Comparator<HoodieInstant> COMPARATOR =
Comparator.comparing(HoodieInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
+ public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+ Comparator.comparing(HoodieInstant::getStateTransitionTime)
+ .thenComparing(COMPARATOR);
+
+ public static final String EMPTY_FILE_EXTENSION = "";
+
public static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}
+ public static String extractTimestamp(String fileName) throws
IllegalArgumentException {
+ Objects.requireNonNull(fileName);
Review Comment:
arg should be nonnull by convention so this is redundant. we only annotate
it if it's nullable
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -51,45 +53,46 @@ class TestStreamingSource extends StreamTest {
withTempDir { inputDir =>
val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(COPY_ON_WRITE)
- .setTableName(getTableName(tablePath))
-
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setTableType(COPY_ON_WRITE)
+ .setTableName(getTableName(tablePath))
+
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
- .initTable(spark.sessionState.newHadoopConf(), tablePath)
+ .initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))
val df = spark.readStream
.format("org.apache.hudi")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(),
useTransitionTime)
Review Comment:
this should be parameterized, otherwise, the default commit instant time is
not test-covered
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -70,32 +105,38 @@ public enum State {
NIL
}
- private State state = State.COMPLETED;
- private String action;
- private String timestamp;
+ private final State state;
+ private final String action;
+ private final String timestamp;
+ private final String stateTransitionTime;
/**
* Load the instant from the meta FileStatus.
*/
public HoodieInstant(FileStatus fileStatus) {
// First read the instant timestamp. [==>20170101193025<==].commit
String fileName = fileStatus.getPath().getName();
- String fileExtension = getTimelineFileExtension(fileName);
- timestamp = fileName.replace(fileExtension, "");
-
- // Next read the action for this marker
- action = fileExtension.replaceFirst(".", "");
- if (action.equals("inflight")) {
- // This is to support backwards compatibility on how in-flight commit
files were written
- // General rule is inflight extension is .<action>.inflight, but for
commit it is .inflight
- action = "commit";
- state = State.INFLIGHT;
- } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
- state = State.INFLIGHT;
- action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
- } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
- state = State.REQUESTED;
- action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ timestamp = matcher.group(1);
+ if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+ // This is to support backwards compatibility on how in-flight commit
files were written
+ // General rule is inflight extension is .<action>.inflight, but for
commit it is .inflight
+ action = HoodieTimeline.COMMIT_ACTION;
+ state = State.INFLIGHT;
+ } else {
+ action = matcher.group(2).replaceFirst(DELIMITER,
StringUtils.EMPTY_STRING);
+ if (matcher.groupCount() == 3 && matcher.group(3) != null) {
+ state = State.valueOf(matcher.group(3).replaceFirst(DELIMITER,
StringUtils.EMPTY_STRING).toUpperCase());
+ } else {
+ // Like 20230104152218702.commit
+ state = State.COMPLETED;
+ }
+ }
+ stateTransitionTime =
+ HoodieInstantTimeGenerator.formatDate(new
Date(fileStatus.getModificationTime()));
+ } else {
+ throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR,
fileName));
Review Comment:
same here. should have a hudi specific exception
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
public static final Comparator<HoodieInstant> COMPARATOR =
Comparator.comparing(HoodieInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
+ public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+ Comparator.comparing(HoodieInstant::getStateTransitionTime)
+ .thenComparing(COMPARATOR);
+
+ public static final String EMPTY_FILE_EXTENSION = "";
+
public static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}
+ public static String extractTimestamp(String fileName) throws
IllegalArgumentException {
+ Objects.requireNonNull(fileName);
+
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+
+ throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR,
fileName));
Review Comment:
should use a hudi specific exception to give more context
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -197,6 +197,13 @@ public HoodieDefaultTimeline findInstantsInRange(String
startTs, String endTs) {
getInstantsAsStream().filter(s ->
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
}
+ @Override
+ public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTs(String
startTs, String endTs) {
Review Comment:
should just call it `findInstantsInRangeByStateTransitionTime`. let's avoid
having acronym in the api
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]