voonhous commented on code in PR #12539:
URL: https://github.com/apache/hudi/pull/12539#discussion_r1942327967


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -156,27 +164,11 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
       ValidationUtils.checkArgument(retrievedStates.size() <= 2,
           getClass().getSimpleName() + " retrieved invalid state.");
 
-      if (retrievedStates.size() == 1 && issuedInstant != null) {
-        // this is the case where we have both legacy and new state.
-        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
-
-        throw new IllegalArgumentException(
-            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
-
-      } else if (retrievedStates.size() == 1) {
-        // for forward compatibility
-        this.issuedInstant = retrievedStates.get(0);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved an issued instant of time {} for table {} 
with path {}.",
-              getClass().getSimpleName(), issuedInstant, 
conf.get(FlinkOptions.TABLE_NAME), path);
-        }
-      } else if (retrievedStates.size() == 2) {
-        this.issuedInstant = retrievedStates.get(0);
-        this.issuedOffset = retrievedStates.get(1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved an issued instant of time [{}, {}] for table 
{} with path {}.",
-              getClass().getSimpleName(), issuedInstant, issuedOffset, 
conf.get(FlinkOptions.TABLE_NAME), path);
-        }
+      this.issuedInstant = retrievedStates.get(0);
+      this.issuedOffset = retrievedStates.get(1);

Review Comment:
   Apologies for the misunderstanding, i do not quite understand what you mean, 
I was under the impression that this would be a backward compatibility 
improvement when migrating from a Hudi **0.x** table to a Hudi **1.0** table. 
   
   From what i understand:
   | version | attribute | available in state? |
   |---|---|---|
   | 0.x | issuedInstant | true |
   | 0.x | issuedOffset | false |
   | 1.x | issuedInstant | true |
   | 1.x | issuedOffset | true |
   
   Hence, i was under the impression that we needed a way to add a 
`issuedOffset` when upgrading from `0.x`, where `issuedOffset` will be **null** 
in the state backend during first read after upgrade. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to