davidradl commented on code in PR #26396:
URL: https://github.com/apache/flink/pull/26396#discussion_r2028613707
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java:
##########
@@ -183,34 +184,53 @@ private void processMethod(RunnableWithException method)
throws Exception {
}
}
+ @SuppressWarnings("unchecked")
private void moveStateToFunction() throws IOException {
Arrays.fill(stateCleared, false);
for (int i = 0; i < stateHandles.length; i++) {
- final RowData value = stateHandles[i].value();
- stateToFunction[i] = value;
+ final State stateHandle = stateHandles[i];
+ if (!(stateHandle instanceof ValueState)) {
+ continue;
+ }
+ final ValueState<RowData> valueState = (ValueState<RowData>)
stateHandle;
+ final RowData value = valueState.value();
+ valueStateToFunction[i] = value;
}
}
+ @SuppressWarnings("unchecked")
private void moveStateFromFunction() throws IOException {
for (int i = 0; i < stateHandles.length; i++) {
- final RowData fromFunction = stateFromFunction[i];
- if (fromFunction == null || isEmpty(fromFunction)) {
- // Reduce state size
- stateHandles[i].clear();
+ final State stateHandle = stateHandles[i];
+ if (stateHandle instanceof ValueState) {
+ moveValueStateFromFunction((ValueState<RowData>) stateHandle,
i);
} else {
- final HashFunction hashCode = stateHashCode[i];
- final RecordEqualiser equals = stateEquals[i];
- final RowData toFunction = stateToFunction[i];
- // Reduce state updates by checking if something has changed
- if (toFunction == null
- || hashCode.hashCode(toFunction) !=
hashCode.hashCode(fromFunction)
- || !equals.equals(toFunction, fromFunction)) {
- stateHandles[i].update(fromFunction);
+ if (stateCleared[i]) {
+ stateHandle.clear();
}
}
}
}
+ private void moveValueStateFromFunction(ValueState<RowData> valueState,
int pos)
+ throws IOException {
+ final RowData fromFunction = valueStateFromFunction[pos];
+ if (fromFunction == null || isEmpty(fromFunction)) {
+ // Reduce state size
+ valueState.clear();
+ } else {
+ final HashFunction hashCode = stateHashCode[pos];
+ final RecordEqualiser equals = stateEquals[pos];
Review Comment:
is it possible to use the equals() method on the functions to assert whether
they are equal? If we are not in control as we cannot rely on the equals as
they are user written, then I can see the need for a class like the Record
Equaliser. I am curious on the use of `RecordEqualiser`, equaliser implies to
me we are looking to make the functions equal - where as we are likely just
looking to compare - maybe a RecordComparator might be a better name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]