davidradl commented on code in PR #26396:
URL: https://github.com/apache/flink/pull/26396#discussion_r2028602861


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java:
##########
@@ -183,34 +184,53 @@ private void processMethod(RunnableWithException method) 
throws Exception {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void moveStateToFunction() throws IOException {
         Arrays.fill(stateCleared, false);
         for (int i = 0; i < stateHandles.length; i++) {
-            final RowData value = stateHandles[i].value();
-            stateToFunction[i] = value;
+            final State stateHandle = stateHandles[i];
+            if (!(stateHandle instanceof ValueState)) {
+                continue;
+            }
+            final ValueState<RowData> valueState = (ValueState<RowData>) 
stateHandle;
+            final RowData value = valueState.value();
+            valueStateToFunction[i] = value;
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void moveStateFromFunction() throws IOException {
         for (int i = 0; i < stateHandles.length; i++) {
-            final RowData fromFunction = stateFromFunction[i];
-            if (fromFunction == null || isEmpty(fromFunction)) {
-                // Reduce state size
-                stateHandles[i].clear();
+            final State stateHandle = stateHandles[i];
+            if (stateHandle instanceof ValueState) {
+                moveValueStateFromFunction((ValueState<RowData>) stateHandle, 
i);
             } else {
-                final HashFunction hashCode = stateHashCode[i];
-                final RecordEqualiser equals = stateEquals[i];
-                final RowData toFunction = stateToFunction[i];
-                // Reduce state updates by checking if something has changed
-                if (toFunction == null
-                        || hashCode.hashCode(toFunction) != 
hashCode.hashCode(fromFunction)
-                        || !equals.equals(toFunction, fromFunction)) {
-                    stateHandles[i].update(fromFunction);
+                if (stateCleared[i]) {
+                    stateHandle.clear();
                 }
             }
         }
     }
 
+    private void moveValueStateFromFunction(ValueState<RowData> valueState, 
int pos)
+            throws IOException {
+        final RowData fromFunction = valueStateFromFunction[pos];
+        if (fromFunction == null || isEmpty(fromFunction)) {
+            // Reduce state size

Review Comment:
   nit: clear not reduce.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to