boneanxs commented on code in PR #7627:
URL: https://github.com/apache/hudi/pull/7627#discussion_r1168236489
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -205,6 +218,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val endInstantArchived =
commitTimeline.isBeforeTimelineStarts(endInstantTime)
val scanDf = if (fallbackToFullTableScan && (startInstantArchived ||
endInstantArchived)) {
+ if (useStateTransitionTime) {
+ throw new HoodieException("Cannot use stateTransitionTime while
enables full table scan")
Review Comment:
If enables `useStateTransitionTime`, means they provides state transition
time for `startInstantTime` and `endInstantTime`, and they will be used to
compare with `_hoodie_commit_time`, then the result is not accurate. So here we
better then error out, or force setting `useStateTransitionTime` back to false?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -46,14 +55,35 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
public static final Comparator<HoodieInstant> COMPARATOR =
Comparator.comparing(HoodieInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
+ public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+ Comparator.comparing(HoodieInstant::getStateTransitionTime)
Review Comment:
We need to use `STATE_TRANSITION_COMPARATOR` in
`Timeline.getInstantsOrderedByStateTransitionTs`, which should sort
`stateTransitionTime` first. So here cannot use `COMPARATOR` directly
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -82,9 +87,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val lastInstant = commitTimeline.lastInstant().get()
- private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
- optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
- optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(),
lastInstant.getTimestamp))
+ private val commitsTimelineToReturn = {
+ if (useStateTransitionTime) {
Review Comment:
If I understand correctly, we only use begin and end instant time(which is
`stateTransitionTime` if enabled) to filter instants, and still use instants'
commit time to compare with `_hoodie_commit_time`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java:
##########
@@ -70,32 +100,38 @@ public enum State {
NIL
}
- private State state = State.COMPLETED;
- private String action;
- private String timestamp;
+ private final State state;
+ private final String action;
+ private final String timestamp;
+ private final String stateTransitionTime;
/**
* Load the instant from the meta FileStatus.
*/
public HoodieInstant(FileStatus fileStatus) {
// First read the instant timestamp. [==>20170101193025<==].commit
String fileName = fileStatus.getPath().getName();
- String fileExtension = getTimelineFileExtension(fileName);
- timestamp = fileName.replace(fileExtension, "");
-
- // Next read the action for this marker
- action = fileExtension.replaceFirst(".", "");
- if (action.equals("inflight")) {
- // This is to support backwards compatibility on how in-flight commit
files were written
- // General rule is inflight extension is .<action>.inflight, but for
commit it is .inflight
- action = "commit";
- state = State.INFLIGHT;
- } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
- state = State.INFLIGHT;
- action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
- } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
- state = State.REQUESTED;
- action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ timestamp = matcher.group(1);
+ if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+ // This is to support backwards compatibility on how in-flight commit
files were written
+ // General rule is inflight extension is .<action>.inflight, but for
commit it is .inflight
+ action = "commit";
+ state = State.INFLIGHT;
+ } else {
+ action = matcher.group(2).replaceFirst(".", "");
+ if (matcher.groupCount() == 3 && matcher.group(3) != null) {
+ state = State.valueOf(matcher.group(3).replaceFirst(".",
"").toUpperCase());
+ } else {
+ // Like 20230104152218702.commit
+ state = State.COMPLETED;
+ }
+ }
+ stateTransitionTime =
Review Comment:
Thank you, @vinothchandar , for bringing up this important question. We
apologize for not considering this aspect earlier. Since the timeline of the
new migrated table could be in a random order if still based on
`modificationTime`, it can be challenging for downstream pipelines to recover
from a new specified time if it's from the old location but not read yet.
So to migrate a table (let's call it A), I think we can follow these steps:
1. Stop ingesting new data into table A.
2. Wait for all downstream jobs to finish their incremental jobs from table
A.
3. Migrate table A to a new location.
4. Delete checkpoint information for all downstream jobs and specify
`hoodie.datasource.streaming.startOffset` to the time when the migration is
completed. Then restart the jobs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]