gustavodemorais commented on code in PR #27764:
URL: https://github.com/apache/flink/pull/27764#discussion_r2996969662


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java:
##########
@@ -68,7 +68,8 @@ public static MultiJoinStateView create(
                     RowType
                             joinKeyType, /* joinKeyType is null for inputId = 
0, see {@link InputSideHasUniqueKey}*/
             RowType recordType,
-            long retentionTime) {
+            long retentionTime,
+            boolean prohibitReuseRowData) {

Review Comment:
   nit: Rename `prohibitReuseRowData` to `requiresDeepCopy` for consistency 
with `AttributeBasedJoinKeyExtractor` from #27508. Apply across both inner 
classes too.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -835,7 +835,8 @@ private void initializeStateHandlers() {
                             inputSpecs.get(i),
                             joinKeyType,
                             inputTypes.get(i),
-                            stateRetentionTime[i]);
+                            stateRetentionTime[i],
+                            prohibitReuseRow);

Review Comment:
   nit: Rename to `requiresDeepCopy` to match #27508 naming.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java:
##########
@@ -231,7 +239,7 @@ private RowData getStateKey(RowData joinKey, RowData 
uniqueKey) {
                 GenericRowData compositeKey = new GenericRowData(2);
                 compositeKey.setField(0, joinKey);
                 compositeKey.setField(1, uniqueKey);
-                return compositeKey;
+                return stateKeySerializer.toBinaryRow(compositeKey, 
prohibitReuseRowData);

Review Comment:
   Two things:
   1. Add a comment explaining why, e.g.: `// Serialize to BinaryRowData to 
prevent object reuse corruption on heap state backend`
   2. Skip conversion entirely for RocksDB - it serializes state keys through 
TypeSerializer anyway. Same pattern as `AttributeBasedJoinKeyExtractor`:
   ```java
   if (!requiresDeepCopy) {
       return compositeKey;
   }
   return stateKeySerializer.toBinaryRow(compositeKey, true);
   ```
   Apply to `InputSideHasNoUniqueKey.getStateKey()` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to