This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 83ea5016478 [HUDI-5517][FOLLOW-UP] Refine API names and ensure time
travel won't affect by stateTransitionTime (#8762)
83ea5016478 is described below
commit 83ea5016478b79d5b7ac862aeef1c1eade6043ab
Author: Rex(Hui) An <[email protected]>
AuthorDate: Fri May 26 13:20:39 2023 +0800
[HUDI-5517][FOLLOW-UP] Refine API names and ensure time travel won't affect
by stateTransitionTime (#8762)
---
.../hudi/common/table/timeline/HoodieDefaultTimeline.java | 4 ++--
.../org/apache/hudi/common/table/timeline/HoodieInstant.java | 10 +++++-----
.../org/apache/hudi/common/table/timeline/HoodieTimeline.java | 4 ++--
.../src/main/scala/org/apache/hudi/IncrementalRelation.scala | 2 +-
.../scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala | 4 ++--
.../apache/spark/sql/hudi/streaming/HoodieStreamSource.scala | 2 +-
.../functional/TestIncrementalReadByStateTransitionTime.scala | 2 +-
7 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 3af81ea9f9f..ed86dd26b5c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -198,7 +198,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
@Override
- public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTs(String
startTs, String endTs) {
+ public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTime(String
startTs, String endTs) {
return new HoodieDefaultTimeline(
getInstantsAsStream().filter(s ->
HoodieTimeline.isInRange(s.getStateTransitionTime(), startTs, endTs)),
details);
@@ -417,7 +417,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
@Override
- public Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs() {
+ public Stream<HoodieInstant> getInstantsOrderedByStateTransitionTime() {
return
getInstantsAsStream().sorted(HoodieInstant.STATE_TRANSITION_COMPARATOR);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 3bc3ad02646..20dc5ced07a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -45,7 +45,8 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
private static final String DELIMITER = ".";
- private static final String FILE_NAME_FORMAT_ERROR = "The fileName %s
doesn't match instant format";
+ private static final String FILE_NAME_FORMAT_ERROR =
+ "The provided file name %s does not conform to the required format";
/**
* A COMPACTION action eventually becomes COMMIT when completed. So, when
grouping instants
@@ -70,14 +71,13 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
}
public static String extractTimestamp(String fileName) throws
IllegalArgumentException {
- Objects.requireNonNull(fileName);
-
Matcher matcher = NAME_FORMAT.matcher(fileName);
if (matcher.find()) {
return matcher.group(1);
}
- throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR,
fileName));
+ throw new IllegalArgumentException("Failed to retrieve timestamp from
name: "
+ + String.format(FILE_NAME_FORMAT_ERROR, fileName));
}
public static String getTimelineFileExtension(String fileName) {
@@ -136,7 +136,7 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
stateTransitionTime =
HoodieInstantTimeGenerator.formatDate(new
Date(fileStatus.getModificationTime()));
} else {
- throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR,
fileName));
+ throw new IllegalArgumentException("Failed to construct HoodieInstant: "
+ String.format(FILE_NAME_FORMAT_ERROR, fileName));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index ce3d880a5bd..b3c8d2a27a0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -239,7 +239,7 @@ public interface HoodieTimeline extends Serializable {
* Create a new Timeline with instants after startTs and before or on endTs
* by state transition timestamp of actions.
*/
- HoodieTimeline findInstantsInRangeByStateTransitionTs(String startTs, String
endTs);
+ HoodieTimeline findInstantsInRangeByStateTransitionTime(String startTs,
String endTs);
/**
* Create a new Timeline with all the instants after startTs.
@@ -358,7 +358,7 @@ public interface HoodieTimeline extends Serializable {
/**
* Get the stream of instants in order by state transition timestamp of
actions.
*/
- Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs();
+ Stream<HoodieInstant> getInstantsOrderedByStateTransitionTime();
/**
* @return true if the passed in instant is before the first completed
instant in the timeline
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 0d178abc5ba..5624120ae0d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -88,7 +88,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val commitsTimelineToReturn = {
if (useStateTransitionTime) {
- commitTimeline.findInstantsInRangeByStateTransitionTs(
+ commitTimeline.findInstantsInRangeByStateTransitionTime(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(),
lastInstant.getStateTransitionTime))
} else {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 74c03e9887c..390520bebaa 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -62,7 +62,7 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
if (fullTableScan) {
metaClient.getCommitsAndCompactionTimeline
} else if (useStateTransitionTime) {
-
metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTs(startTimestamp,
endTimestamp)
+
metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTime(startTimestamp,
endTimestamp)
} else {
metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp,
endTimestamp)
}
@@ -165,7 +165,7 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
// If endTimestamp commit is not archived, will filter instants
// before endTimestamp.
if (useStateTransitionTime) {
- super.timeline.findInstantsInRangeByStateTransitionTs(startTimestamp,
endTimestamp).getInstants.asScala.toList
+
super.timeline.findInstantsInRangeByStateTransitionTime(startTimestamp,
endTimestamp).getInstants.asScala.toList
} else {
super.timeline.findInstantsInRange(startTimestamp,
endTimestamp).getInstants.asScala.toList
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index 2f77c4cd770..d39e2074095 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -107,7 +107,7 @@ class HoodieStreamSource(
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
match {
case activeInstants if !activeInstants.empty() =>
val timestamp = if (useStateTransitionTime) {
- activeInstants.getInstantsOrderedByStateTransitionTs
+ activeInstants.getInstantsOrderedByStateTransitionTime
.skip(activeInstants.countInstants() - 1)
.findFirst()
.get()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
index b1e81d9857d..c5aae60a759 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
@@ -80,7 +80,7 @@ class TestIncrementalReadByStateTransitionTime extends
HoodieSparkClientTestBase
.setLoadActiveTimelineOnLoad(true)
.build()
- val firstInstant =
metaClient.getActiveTimeline.filterCompletedInstants().getInstantsOrderedByStateTransitionTs
+ val firstInstant =
metaClient.getActiveTimeline.filterCompletedInstants().getInstantsOrderedByStateTransitionTime
.findFirst().get()
val result1 = spark.read.format("org.apache.hudi")