This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new fae3e70  [FLINK-22203][table-runtime-blink] Fix 
ConcurrentModificationException for testing values sink functions (#15978)
fae3e70 is described below

commit fae3e70532056465be3a2376a47ba20b5f80b13b
Author: Jark Wu <[email protected]>
AuthorDate: Fri May 21 12:27:44 2021 +0800

    [FLINK-22203][table-runtime-blink] Fix ConcurrentModificationException for 
testing values sink functions (#15978)
---
 .../factories/TestValuesRuntimeFunctions.java      | 116 ++++++++++++---------
 1 file changed, 68 insertions(+), 48 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 7a4c847..e743c6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -73,6 +73,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /** Runtime function implementations for {@link TestValuesTableFactory}. */
 final class TestValuesRuntimeFunctions {
 
+    static final Object LOCK = TestValuesTableFactory.class;
+
     // [table_name, [task_id, List[value]]]
     private static final Map<String, Map<Integer, List<String>>> 
globalRawResult = new HashMap<>();
     // [table_name, [task_id, Map[key, value]]]
@@ -95,7 +97,7 @@ final class TestValuesRuntimeFunctions {
     }
 
     static List<Watermark> getWatermarks(String tableName) {
-        synchronized (TestValuesTableFactory.class) {
+        synchronized (LOCK) {
             if (watermarkHistory.containsKey(tableName)) {
                 return new ArrayList<>(watermarkHistory.get(tableName));
             } else {
@@ -106,7 +108,7 @@ final class TestValuesRuntimeFunctions {
 
     static List<String> getResults(String tableName) {
         List<String> result = new ArrayList<>();
-        synchronized (TestValuesTableFactory.class) {
+        synchronized (LOCK) {
             if (globalUpsertResult.containsKey(tableName)) {
                 globalUpsertResult
                         .get(tableName)
@@ -124,7 +126,7 @@ final class TestValuesRuntimeFunctions {
     }
 
     static void clearResults() {
-        synchronized (TestValuesTableFactory.class) {
+        synchronized (LOCK) {
             globalRawResult.clear();
             globalUpsertResult.clear();
             globalRetractResult.clear();
@@ -255,7 +257,11 @@ final class TestValuesRuntimeFunctions {
                 ctx.emitWatermark(
                         new org.apache.flink.streaming.api.watermark.Watermark(
                                 watermark.getTimestamp()));
-                watermarkHistory.computeIfAbsent(tableName, k -> new 
LinkedList<>()).add(watermark);
+                synchronized (LOCK) {
+                    watermarkHistory
+                            .computeIfAbsent(tableName, k -> new 
LinkedList<>())
+                            .add(watermark);
+                }
             }
 
             @Override
@@ -296,7 +302,7 @@ final class TestValuesRuntimeFunctions {
                 }
             }
             int taskId = getRuntimeContext().getIndexOfThisSubtask();
-            synchronized (TestValuesTableFactory.class) {
+            synchronized (LOCK) {
                 globalRawResult
                         .computeIfAbsent(tableName, k -> new HashMap<>())
                         .put(taskId, localRawResult);
@@ -306,7 +312,9 @@ final class TestValuesRuntimeFunctions {
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
             rawResultState.clear();
-            rawResultState.addAll(localRawResult);
+            synchronized (LOCK) {
+                rawResultState.addAll(localRawResult);
+            }
         }
     }
 
@@ -338,7 +346,9 @@ final class TestValuesRuntimeFunctions {
                         return;
                     }
                 }
-                localRawResult.add(kind.shortString() + "(" + row.toString() + 
")");
+                synchronized (LOCK) {
+                    localRawResult.add(kind.shortString() + "(" + 
row.toString() + ")");
+                }
             } else {
                 throw new RuntimeException(
                         "AppendingSinkFunction received " + value.getRowKind() 
+ " messages.");
@@ -408,7 +418,7 @@ final class TestValuesRuntimeFunctions {
             }
 
             int taskId = getRuntimeContext().getIndexOfThisSubtask();
-            synchronized (TestValuesTableFactory.class) {
+            synchronized (LOCK) {
                 globalUpsertResult
                         .computeIfAbsent(tableName, k -> new HashMap<>())
                         .put(taskId, localUpsertResult);
@@ -419,9 +429,11 @@ final class TestValuesRuntimeFunctions {
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
             super.snapshotState(context);
             upsertResultState.clear();
-            for (Map.Entry<String, String> entry : 
localUpsertResult.entrySet()) {
-                upsertResultState.add(entry.getKey());
-                upsertResultState.add(entry.getValue());
+            synchronized (LOCK) {
+                for (Map.Entry<String, String> entry : 
localUpsertResult.entrySet()) {
+                    upsertResultState.add(entry.getKey());
+                    upsertResultState.add(entry.getValue());
+                }
             }
             receivedNumState.update(Collections.singletonList(receivedNum));
         }
@@ -434,30 +446,32 @@ final class TestValuesRuntimeFunctions {
             Row row = (Row) converter.toExternal(value);
             assert row != null;
 
-            if (RowUtils.USE_LEGACY_TO_STRING) {
-                localRawResult.add(kind.shortString() + "(" + row.toString() + 
")");
-            } else {
-                localRawResult.add(row.toString());
-            }
-
-            row.setKind(RowKind.INSERT);
-            Row key = Row.project(row, keyIndices);
+            synchronized (LOCK) {
+                if (RowUtils.USE_LEGACY_TO_STRING) {
+                    localRawResult.add(kind.shortString() + "(" + 
row.toString() + ")");
+                } else {
+                    localRawResult.add(row.toString());
+                }
 
-            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
-                localUpsertResult.put(key.toString(), row.toString());
-            } else {
-                String oldValue = localUpsertResult.remove(key.toString());
-                if (oldValue == null) {
-                    throw new RuntimeException(
-                            "Tried to delete a value that wasn't inserted 
first. "
-                                    + "This is probably an incorrectly 
implemented test.");
+                row.setKind(RowKind.INSERT);
+                Row key = Row.project(row, keyIndices);
+
+                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                    localUpsertResult.put(key.toString(), row.toString());
+                } else {
+                    String oldValue = localUpsertResult.remove(key.toString());
+                    if (oldValue == null) {
+                        throw new RuntimeException(
+                                "Tried to delete a value that wasn't inserted 
first. "
+                                        + "This is probably an incorrectly 
implemented test.");
+                    }
+                }
+                receivedNum++;
+                if (expectedSize != -1 && receivedNum == expectedSize) {
+                    // some sources are infinite (e.g. kafka),
+                    // we throw a SuccessException to indicate job is finished.
+                    throw new SuccessException();
                 }
-            }
-            receivedNum++;
-            if (expectedSize != -1 && receivedNum == expectedSize) {
-                // some sources are infinite (e.g. kafka),
-                // we throw a SuccessException to indicate job is finished.
-                throw new SuccessException();
             }
         }
     }
@@ -492,7 +506,7 @@ final class TestValuesRuntimeFunctions {
             }
 
             int taskId = getRuntimeContext().getIndexOfThisSubtask();
-            synchronized (TestValuesTableFactory.class) {
+            synchronized (LOCK) {
                 globalRetractResult
                         .computeIfAbsent(tableName, k -> new HashMap<>())
                         .put(taskId, localRetractResult);
@@ -503,7 +517,9 @@ final class TestValuesRuntimeFunctions {
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
             super.snapshotState(context);
             retractResultState.clear();
-            retractResultState.addAll(localRetractResult);
+            synchronized (LOCK) {
+                retractResultState.addAll(localRetractResult);
+            }
         }
 
         @SuppressWarnings("rawtypes")
@@ -512,17 +528,19 @@ final class TestValuesRuntimeFunctions {
             RowKind kind = value.getRowKind();
             Row row = (Row) converter.toExternal(value);
             assert row != null;
-            localRawResult.add(kind.shortString() + "(" + row.toString() + 
")");
-            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
-                row.setKind(RowKind.INSERT);
-                localRetractResult.add(row.toString());
-            } else {
-                row.setKind(RowKind.INSERT);
-                boolean contains = localRetractResult.remove(row.toString());
-                if (!contains) {
-                    throw new RuntimeException(
-                            "Tried to retract a value that wasn't inserted 
first. "
-                                    + "This is probably an incorrectly 
implemented test.");
+            synchronized (LOCK) {
+                localRawResult.add(kind.shortString() + "(" + row.toString() + 
")");
+                if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                    row.setKind(RowKind.INSERT);
+                    localRetractResult.add(row.toString());
+                } else {
+                    row.setKind(RowKind.INSERT);
+                    boolean contains = 
localRetractResult.remove(row.toString());
+                    if (!contains) {
+                        throw new RuntimeException(
+                                "Tried to retract a value that wasn't inserted 
first. "
+                                        + "This is probably an incorrectly 
implemented test.");
+                    }
                 }
             }
         }
@@ -549,7 +567,7 @@ final class TestValuesRuntimeFunctions {
         @Override
         public void open(int taskNumber, int numTasks) throws IOException {
             this.localRawResult = new ArrayList<>();
-            synchronized (TestValuesTableFactory.class) {
+            synchronized (LOCK) {
                 globalRawResult
                         .computeIfAbsent(tableName, k -> new HashMap<>())
                         .put(taskNumber, localRawResult);
@@ -562,7 +580,9 @@ final class TestValuesRuntimeFunctions {
             if (value.getRowKind() == RowKind.INSERT) {
                 Row row = (Row) converter.toExternal(value);
                 assert row != null;
-                localRawResult.add(kind.shortString() + "(" + row.toString() + 
")");
+                synchronized (LOCK) {
+                    localRawResult.add(kind.shortString() + "(" + 
row.toString() + ")");
+                }
             } else {
                 throw new RuntimeException(
                         "AppendingOutputFormat received " + value.getRowKind() 
+ " messages.");

Reply via email to