twalthr commented on code in PR #26396:
URL: https://github.com/apache/flink/pull/26396#discussion_r2028865942
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java:
##########
@@ -51,13 +48,40 @@
/**
* Utilities to deal with {@link DataView}s.
*
- * <p>A {@link DataView} is either represented as a regular {@link
StructuredType} or as a {@link
- * RawType} that serializes to {@code null} when backed by a state backend. In
the latter case, a
- * {@link DataViewSpec} contains all information necessary to store and
retrieve data from state.
+ * <p>For aggregating functions: A {@link DataView} is a field that is either
represented as a
+ * regular {@link StructuredType} or as a {@link RawType} that serializes to
{@code null} when
+ * backed by a state backend. In the latter case, a {@link DataViewSpec}
contains all information
+ * necessary to store and retrieve data from state.
+ *
+ * <p>For process table functions: A {@link DataView} is a top-level instance
that is always backed
+ * by a state backend.
*/
@Internal
public final class DataViewUtils {
+ /** Returns whether the given {@link LogicalType} qualifies as a {@link
DataView}. */
+ public static boolean isDataView(LogicalType viewType, Class<? extends
DataView> viewClass) {
+ final boolean isDataView =
+ viewType.is(STRUCTURED_TYPE)
+ && ((StructuredType) viewType)
+ .getImplementationClass()
+ .map(viewClass::isAssignableFrom)
+ .orElse(false);
+ if (!isDataView) {
Review Comment:
I'm following the [early return
principle](https://dev.to/shameel/-clean-coding-tip-early-return-principle-47kj)
here
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java:
##########
@@ -183,34 +184,53 @@ private void processMethod(RunnableWithException method)
throws Exception {
}
}
+ @SuppressWarnings("unchecked")
private void moveStateToFunction() throws IOException {
Arrays.fill(stateCleared, false);
for (int i = 0; i < stateHandles.length; i++) {
- final RowData value = stateHandles[i].value();
- stateToFunction[i] = value;
+ final State stateHandle = stateHandles[i];
+ if (!(stateHandle instanceof ValueState)) {
Review Comment:
I'm following the [early return
principle](https://dev.to/shameel/-clean-coding-tip-early-return-principle-47kj)
here. I think in the end this is a matter of taste. Feel free to code it
differently in your PRs.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java:
##########
@@ -183,34 +184,53 @@ private void processMethod(RunnableWithException method)
throws Exception {
}
}
+ @SuppressWarnings("unchecked")
private void moveStateToFunction() throws IOException {
Arrays.fill(stateCleared, false);
for (int i = 0; i < stateHandles.length; i++) {
- final RowData value = stateHandles[i].value();
- stateToFunction[i] = value;
+ final State stateHandle = stateHandles[i];
+ if (!(stateHandle instanceof ValueState)) {
+ continue;
+ }
+ final ValueState<RowData> valueState = (ValueState<RowData>)
stateHandle;
+ final RowData value = valueState.value();
+ valueStateToFunction[i] = value;
}
}
+ @SuppressWarnings("unchecked")
private void moveStateFromFunction() throws IOException {
for (int i = 0; i < stateHandles.length; i++) {
- final RowData fromFunction = stateFromFunction[i];
- if (fromFunction == null || isEmpty(fromFunction)) {
- // Reduce state size
- stateHandles[i].clear();
+ final State stateHandle = stateHandles[i];
+ if (stateHandle instanceof ValueState) {
+ moveValueStateFromFunction((ValueState<RowData>) stateHandle,
i);
} else {
- final HashFunction hashCode = stateHashCode[i];
- final RecordEqualiser equals = stateEquals[i];
- final RowData toFunction = stateToFunction[i];
- // Reduce state updates by checking if something has changed
- if (toFunction == null
- || hashCode.hashCode(toFunction) !=
hashCode.hashCode(fromFunction)
- || !equals.equals(toFunction, fromFunction)) {
- stateHandles[i].update(fromFunction);
+ if (stateCleared[i]) {
+ stateHandle.clear();
}
}
}
}
+ private void moveValueStateFromFunction(ValueState<RowData> valueState,
int pos)
+ throws IOException {
+ final RowData fromFunction = valueStateFromFunction[pos];
+ if (fromFunction == null || isEmpty(fromFunction)) {
+ // Reduce state size
+ valueState.clear();
+ } else {
+ final HashFunction hashCode = stateHashCode[pos];
+ final RecordEqualiser equals = stateEquals[pos];
Review Comment:
We are comparing internal data structures here. i.e. `RowData`. Which might
be backed by memory segments. Also SQL semantics need to be considered when
comparing nested data. This is why `RecordEqualiser` and `HashFunction` are
code generated instances.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java:
##########
@@ -331,12 +361,24 @@ private void setStateDescriptors() {
this.stateDescriptors = stateDescriptors;
}
- @SuppressWarnings("unchecked")
private void setStateHandles() {
final KeyedStateStore keyedStateStore = getKeyedStateStore();
- final ValueState<RowData>[] stateHandles = new
ValueState[stateDescriptors.length];
+ final State[] stateHandles = new State[stateDescriptors.length];
for (int i = 0; i < stateInfos.size(); i++) {
- stateHandles[i] = keyedStateStore.getState(stateDescriptors[i]);
Review Comment:
the size of stateInfos, stateDescriptors, and stateHandles is equal. But I
agree that this comment is valid.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java:
##########
@@ -86,21 +89,33 @@ public EV get(EK key) throws Exception {
@Override
public void put(EK key, EV value) throws Exception {
+ if (key == null) {
Review Comment:
Null values are supported. As written in the JavaDocs of MapView.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]