danny0405 commented on code in PR #12539:
URL: https://github.com/apache/hudi/pull/12539#discussion_r1942254279


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -156,27 +164,11 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
       ValidationUtils.checkArgument(retrievedStates.size() <= 2,
           getClass().getSimpleName() + " retrieved invalid state.");
 
-      if (retrievedStates.size() == 1 && issuedInstant != null) {
-        // this is the case where we have both legacy and new state.
-        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
-
-        throw new IllegalArgumentException(
-            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
-
-      } else if (retrievedStates.size() == 1) {
-        // for forward compatibility
-        this.issuedInstant = retrievedStates.get(0);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved an issued instant of time {} for table {} 
with path {}.",
-              getClass().getSimpleName(), issuedInstant, 
conf.get(FlinkOptions.TABLE_NAME), path);
-        }
-      } else if (retrievedStates.size() == 2) {
-        this.issuedInstant = retrievedStates.get(0);
-        this.issuedOffset = retrievedStates.get(1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved an issued instant of time [{}, {}] for table 
{} with path {}.",
-              getClass().getSimpleName(), issuedInstant, issuedOffset, 
conf.get(FlinkOptions.TABLE_NAME), path);
-        }
+      this.issuedInstant = retrievedStates.get(0);
+      this.issuedOffset = retrievedStates.get(1);

Review Comment:
   I expected this be a simple change, check the `issuedOffset` against the 
timeline for the first time of read, if the `issuedOffset` is an instant time, 
then just switch to it's completion time or modification time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to