cshuo commented on code in PR #13592:
URL: https://github.com/apache/hudi/pull/13592#discussion_r2224245216
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -177,6 +194,20 @@ public void initializeState(FunctionInitializationContext
context) throws Except
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table
{} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset,
conf.get(FlinkOptions.TABLE_NAME), path);
}
+ } else if (retrievedStates.size() == 3) {
+ this.issuedInstant = retrievedStates.get(0);
+ this.issuedOffset = retrievedStates.get(1);
+ this.totalSplits = Integer.parseInt(retrievedStates.get(2));
Review Comment:
Is it necessary to make snapshot for `totalSplits`? it's kind of hacky to
put it in `instantState`, which is used to store instant time.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -97,8 +102,14 @@ public class StreamReadMonitoringFunction
private String issuedOffset;
+ private int totalSplits = -1;
Review Comment:
add doc for the field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]