This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16138f9e9941230d1644ea5122654785e955c668 Author: Ingo Bürk <[email protected]> AuthorDate: Fri Jun 25 13:27:56 2021 +0200 [hotfix][table-planner] Expose returning the only raw results if possible --- .../planner/factories/TestValuesRuntimeFunctions.java | 17 ++++++++++++++++- .../table/planner/factories/TestValuesTableFactory.java | 10 ++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index de98081..7a4a4c0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -88,7 +88,7 @@ final class TestValuesRuntimeFunctions { static List<String> getRawResults(String tableName) { List<String> result = new ArrayList<>(); - synchronized (TestValuesTableFactory.class) { + synchronized (LOCK) { if (globalRawResult.containsKey(tableName)) { globalRawResult.get(tableName).values().forEach(result::addAll); } @@ -96,6 +96,21 @@ final class TestValuesRuntimeFunctions { return result; } + /** Returns raw results if there was only one table with results, throws otherwise. */ + static List<String> getOnlyRawResults() { + List<String> result = new ArrayList<>(); + synchronized (LOCK) { + if (globalRawResult.size() != 1) { + throw new IllegalStateException( + "Expected results for only one table to be present, but found " + + globalRawResult.size()); + } + + globalRawResult.values().iterator().next().values().forEach(result::addAll); + } + return result; + } + static List<Watermark> getWatermarks(String tableName) { synchronized (LOCK) { if (watermarkHistory.containsKey(tableName)) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 926f057..808abec 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -176,6 +176,16 @@ public final class TestValuesTableFactory } /** + * Returns received row results if there has been exactly one sink, and throws an error + * otherwise. + * + * <p>The raw results are encoded with {@link RowKind}. + */ + public static List<String> getOnlyRawResults() { + return TestValuesRuntimeFunctions.getOnlyRawResults(); + } + + /** * Returns materialized (final) results of the registered table sink. * * @param tableName the table name of the registered table sink.
