This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a40361  [FLINK-22899][table] ValuesUpsertSinkFunction needs to use 
global upsert
3a40361 is described below

commit 3a4036129865fcddb809335b3a55a949a75dc8ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 17 10:28:32 2021 +0800

    [FLINK-22899][table] ValuesUpsertSinkFunction needs to use global upsert
    
    This closes #16091
---
 .../factories/TestValuesRuntimeFunctions.java      | 65 ++++------------------
 1 file changed, 10 insertions(+), 55 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index e743c6a..de98081 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -356,19 +356,18 @@ final class TestValuesRuntimeFunctions {
         }
     }
 
+    /**
+     * NOTE: This class should use a global map to store upsert values. Just 
like other external
+     * databases.
+     */
     static class KeyedUpsertingSinkFunction extends AbstractExactlyOnceSink {
         private static final long serialVersionUID = 1L;
         private final DataStructureConverter converter;
         private final int[] keyIndices;
         private final int expectedSize;
 
-        // we store key and value as adjacent elements in the ListState
-        private transient ListState<String> upsertResultState;
         // [key, value] map result
         private transient Map<String, String> localUpsertResult;
-
-        // received count state
-        private transient ListState<Integer> receivedNumState;
         private transient int receivedNum;
 
         protected KeyedUpsertingSinkFunction(
@@ -385,61 +384,17 @@ final class TestValuesRuntimeFunctions {
         @Override
         public void initializeState(FunctionInitializationContext context) 
throws Exception {
             super.initializeState(context);
-            this.upsertResultState =
-                    context.getOperatorStateStore()
-                            .getListState(
-                                    new 
ListStateDescriptor<>("sink-upsert-results", Types.STRING));
-            this.localUpsertResult = new HashMap<>();
-            this.receivedNumState =
-                    context.getOperatorStateStore()
-                            .getListState(
-                                    new 
ListStateDescriptor<>("sink-received-num", Types.INT));
-
-            if (context.isRestored()) {
-                String key = null;
-                String value;
-                for (String entry : upsertResultState.get()) {
-                    if (key == null) {
-                        key = entry;
-                    } else {
-                        value = entry;
-                        localUpsertResult.put(key, value);
-                        // reset
-                        key = null;
-                    }
-                }
-                if (key != null) {
-                    throw new RuntimeException("The upsertResultState is 
corrupt.");
-                }
-                for (int num : receivedNumState.get()) {
-                    // should only be single element
-                    this.receivedNum = num;
-                }
-            }
 
-            int taskId = getRuntimeContext().getIndexOfThisSubtask();
             synchronized (LOCK) {
-                globalUpsertResult
-                        .computeIfAbsent(tableName, k -> new HashMap<>())
-                        .put(taskId, localUpsertResult);
+                // always store in a single map, global upsert
+                this.localUpsertResult =
+                        globalUpsertResult
+                                .computeIfAbsent(tableName, k -> new 
HashMap<>())
+                                .computeIfAbsent(0, k -> new HashMap<>());
             }
         }
 
         @Override
-        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            super.snapshotState(context);
-            upsertResultState.clear();
-            synchronized (LOCK) {
-                for (Map.Entry<String, String> entry : 
localUpsertResult.entrySet()) {
-                    upsertResultState.add(entry.getKey());
-                    upsertResultState.add(entry.getValue());
-                }
-            }
-            receivedNumState.update(Collections.singletonList(receivedNum));
-        }
-
-        @SuppressWarnings("rawtypes")
-        @Override
         public void invoke(RowData value, Context context) throws Exception {
             RowKind kind = value.getRowKind();
 
@@ -448,7 +403,7 @@ final class TestValuesRuntimeFunctions {
 
             synchronized (LOCK) {
                 if (RowUtils.USE_LEGACY_TO_STRING) {
-                    localRawResult.add(kind.shortString() + "(" + 
row.toString() + ")");
+                    localRawResult.add(kind.shortString() + "(" + row + ")");
                 } else {
                     localRawResult.add(row.toString());
                 }

Reply via email to