This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new fae3e70 [FLINK-22203][table-runtime-blink] Fix
ConcurrentModificationException for testing values sink functions (#15978)
fae3e70 is described below
commit fae3e70532056465be3a2376a47ba20b5f80b13b
Author: Jark Wu <[email protected]>
AuthorDate: Fri May 21 12:27:44 2021 +0800
[FLINK-22203][table-runtime-blink] Fix ConcurrentModificationException for
testing values sink functions (#15978)
---
.../factories/TestValuesRuntimeFunctions.java | 116 ++++++++++++---------
1 file changed, 68 insertions(+), 48 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 7a4c847..e743c6a 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -73,6 +73,8 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
/** Runtime function implementations for {@link TestValuesTableFactory}. */
final class TestValuesRuntimeFunctions {
+ static final Object LOCK = TestValuesTableFactory.class;
+
// [table_name, [task_id, List[value]]]
private static final Map<String, Map<Integer, List<String>>>
globalRawResult = new HashMap<>();
// [table_name, [task_id, Map[key, value]]]
@@ -95,7 +97,7 @@ final class TestValuesRuntimeFunctions {
}
static List<Watermark> getWatermarks(String tableName) {
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
if (watermarkHistory.containsKey(tableName)) {
return new ArrayList<>(watermarkHistory.get(tableName));
} else {
@@ -106,7 +108,7 @@ final class TestValuesRuntimeFunctions {
static List<String> getResults(String tableName) {
List<String> result = new ArrayList<>();
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
if (globalUpsertResult.containsKey(tableName)) {
globalUpsertResult
.get(tableName)
@@ -124,7 +126,7 @@ final class TestValuesRuntimeFunctions {
}
static void clearResults() {
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
globalRawResult.clear();
globalUpsertResult.clear();
globalRetractResult.clear();
@@ -255,7 +257,11 @@ final class TestValuesRuntimeFunctions {
ctx.emitWatermark(
new org.apache.flink.streaming.api.watermark.Watermark(
watermark.getTimestamp()));
- watermarkHistory.computeIfAbsent(tableName, k -> new
LinkedList<>()).add(watermark);
+ synchronized (LOCK) {
+ watermarkHistory
+ .computeIfAbsent(tableName, k -> new
LinkedList<>())
+ .add(watermark);
+ }
}
@Override
@@ -296,7 +302,7 @@ final class TestValuesRuntimeFunctions {
}
}
int taskId = getRuntimeContext().getIndexOfThisSubtask();
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
globalRawResult
.computeIfAbsent(tableName, k -> new HashMap<>())
.put(taskId, localRawResult);
@@ -306,7 +312,9 @@ final class TestValuesRuntimeFunctions {
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
rawResultState.clear();
- rawResultState.addAll(localRawResult);
+ synchronized (LOCK) {
+ rawResultState.addAll(localRawResult);
+ }
}
}
@@ -338,7 +346,9 @@ final class TestValuesRuntimeFunctions {
return;
}
}
- localRawResult.add(kind.shortString() + "(" + row.toString() +
")");
+ synchronized (LOCK) {
+ localRawResult.add(kind.shortString() + "(" +
row.toString() + ")");
+ }
} else {
throw new RuntimeException(
"AppendingSinkFunction received " + value.getRowKind()
+ " messages.");
@@ -408,7 +418,7 @@ final class TestValuesRuntimeFunctions {
}
int taskId = getRuntimeContext().getIndexOfThisSubtask();
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
globalUpsertResult
.computeIfAbsent(tableName, k -> new HashMap<>())
.put(taskId, localUpsertResult);
@@ -419,9 +429,11 @@ final class TestValuesRuntimeFunctions {
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
super.snapshotState(context);
upsertResultState.clear();
- for (Map.Entry<String, String> entry :
localUpsertResult.entrySet()) {
- upsertResultState.add(entry.getKey());
- upsertResultState.add(entry.getValue());
+ synchronized (LOCK) {
+ for (Map.Entry<String, String> entry :
localUpsertResult.entrySet()) {
+ upsertResultState.add(entry.getKey());
+ upsertResultState.add(entry.getValue());
+ }
}
receivedNumState.update(Collections.singletonList(receivedNum));
}
@@ -434,30 +446,32 @@ final class TestValuesRuntimeFunctions {
Row row = (Row) converter.toExternal(value);
assert row != null;
- if (RowUtils.USE_LEGACY_TO_STRING) {
- localRawResult.add(kind.shortString() + "(" + row.toString() +
")");
- } else {
- localRawResult.add(row.toString());
- }
-
- row.setKind(RowKind.INSERT);
- Row key = Row.project(row, keyIndices);
+ synchronized (LOCK) {
+ if (RowUtils.USE_LEGACY_TO_STRING) {
+ localRawResult.add(kind.shortString() + "(" +
row.toString() + ")");
+ } else {
+ localRawResult.add(row.toString());
+ }
- if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
- localUpsertResult.put(key.toString(), row.toString());
- } else {
- String oldValue = localUpsertResult.remove(key.toString());
- if (oldValue == null) {
- throw new RuntimeException(
- "Tried to delete a value that wasn't inserted
first. "
- + "This is probably an incorrectly
implemented test.");
+ row.setKind(RowKind.INSERT);
+ Row key = Row.project(row, keyIndices);
+
+ if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+ localUpsertResult.put(key.toString(), row.toString());
+ } else {
+ String oldValue = localUpsertResult.remove(key.toString());
+ if (oldValue == null) {
+ throw new RuntimeException(
+ "Tried to delete a value that wasn't inserted
first. "
+ + "This is probably an incorrectly
implemented test.");
+ }
+ }
+ receivedNum++;
+ if (expectedSize != -1 && receivedNum == expectedSize) {
+ // some sources are infinite (e.g. kafka),
+ // we throw a SuccessException to indicate job is finished.
+ throw new SuccessException();
}
- }
- receivedNum++;
- if (expectedSize != -1 && receivedNum == expectedSize) {
- // some sources are infinite (e.g. kafka),
- // we throw a SuccessException to indicate job is finished.
- throw new SuccessException();
}
}
}
@@ -492,7 +506,7 @@ final class TestValuesRuntimeFunctions {
}
int taskId = getRuntimeContext().getIndexOfThisSubtask();
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
globalRetractResult
.computeIfAbsent(tableName, k -> new HashMap<>())
.put(taskId, localRetractResult);
@@ -503,7 +517,9 @@ final class TestValuesRuntimeFunctions {
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
super.snapshotState(context);
retractResultState.clear();
- retractResultState.addAll(localRetractResult);
+ synchronized (LOCK) {
+ retractResultState.addAll(localRetractResult);
+ }
}
@SuppressWarnings("rawtypes")
@@ -512,17 +528,19 @@ final class TestValuesRuntimeFunctions {
RowKind kind = value.getRowKind();
Row row = (Row) converter.toExternal(value);
assert row != null;
- localRawResult.add(kind.shortString() + "(" + row.toString() +
")");
- if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
- row.setKind(RowKind.INSERT);
- localRetractResult.add(row.toString());
- } else {
- row.setKind(RowKind.INSERT);
- boolean contains = localRetractResult.remove(row.toString());
- if (!contains) {
- throw new RuntimeException(
- "Tried to retract a value that wasn't inserted
first. "
- + "This is probably an incorrectly
implemented test.");
+ synchronized (LOCK) {
+ localRawResult.add(kind.shortString() + "(" + row.toString() +
")");
+ if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+ row.setKind(RowKind.INSERT);
+ localRetractResult.add(row.toString());
+ } else {
+ row.setKind(RowKind.INSERT);
+ boolean contains =
localRetractResult.remove(row.toString());
+ if (!contains) {
+ throw new RuntimeException(
+ "Tried to retract a value that wasn't inserted
first. "
+ + "This is probably an incorrectly
implemented test.");
+ }
}
}
}
@@ -549,7 +567,7 @@ final class TestValuesRuntimeFunctions {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.localRawResult = new ArrayList<>();
- synchronized (TestValuesTableFactory.class) {
+ synchronized (LOCK) {
globalRawResult
.computeIfAbsent(tableName, k -> new HashMap<>())
.put(taskNumber, localRawResult);
@@ -562,7 +580,9 @@ final class TestValuesRuntimeFunctions {
if (value.getRowKind() == RowKind.INSERT) {
Row row = (Row) converter.toExternal(value);
assert row != null;
- localRawResult.add(kind.shortString() + "(" + row.toString() +
")");
+ synchronized (LOCK) {
+ localRawResult.add(kind.shortString() + "(" +
row.toString() + ")");
+ }
} else {
throw new RuntimeException(
"AppendingOutputFormat received " + value.getRowKind()
+ " messages.");