[
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596185#comment-16596185
]
ASF GitHub Bot commented on FLINK-8686:
---------------------------------------
asfgit closed pull request #6621: [FLINK-8686] [sql-client] Limit result size
for prototyping modes
URL: https://github.com/apache/flink/pull/6621
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index f16f1a5561a..8c4ba83c6dc 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -106,7 +106,7 @@ Alice, 1
Greg, 1
{% endhighlight %}
-Both result modes can be useful during the prototyping of SQL queries.
+Both result modes can be useful during the prototyping of SQL queries. In both
modes, results are stored in the Java heap memory of the SQL Client. In order
to keep the CLI interface responsive, the changelog mode only shows the latest
1000 changes. The table mode allows for navigating through bigger results that
are only limited by the available main memory and the configured [maximum
number of rows](sqlClient.html#configuration) (`max-table-result-rows`).
<span class="label label-danger">Attention</span> Queries that are executed in
a batch environment, can only be retrieved using the `table` result mode.
@@ -167,6 +167,7 @@ Every environment file is a regular [YAML
file](http://yaml.org/). An example of
tables:
- name: MyTableSource
type: source
+ update-mode: append
connector:
type: filesystem
path: "/path/to/something.csv"
@@ -206,6 +207,8 @@ functions:
execution:
type: streaming # required: execution mode either 'batch'
or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
+ max-table-result-rows: 1000000 # optional: maximum number of maintained
rows in
+ # 'table' mode (1000000 by default,
smaller 1 means unlimited)
time-characteristic: event-time # optional: 'processing-time' or
'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by
default)
periodic-watermarks-interval: 200 # optional: interval for periodic
watermarks (200 ms by default)
@@ -213,7 +216,7 @@ execution:
min-idle-state-retention: 0 # optional: table program's minimum idle
state time
max-idle-state-retention: 0 # optional: table program's maximum idle
state time
restart-strategy: # optional: restart strategy
- type: fallback # "fallback" to global restart
strategy by default
+ type: fallback # "fallback" to global restart strategy
by default
# Deployment properties allow for describing the cluster to which table
programs are submitted to.
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 302651a78aa..97e89fd6459 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -76,7 +76,9 @@ execution:
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
- result-mode: changelog
+ result-mode: table
+ # maximum number of maintained rows in 'table' presentation of results
+ max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
index cf17933d977..d70af807a30 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
@@ -50,6 +51,7 @@
*/
public class CliChangelogResultView extends
CliResultView<CliChangelogResultView.ResultChangelogOperation> {
+ private static final int DEFAULT_MAX_ROW_COUNT = 1000;
private static final int DEFAULT_REFRESH_INTERVAL = 0; // as fast as
possible
private static final int DEFAULT_REFRESH_INTERVAL_PLAIN = 3; // every 1s
private static final int MIN_REFRESH_INTERVAL = 0; // every 100ms
@@ -66,7 +68,8 @@ public CliChangelogResultView(CliClient client,
ResultDescriptor resultDescripto
refreshInterval = DEFAULT_REFRESH_INTERVAL;
}
previousResults = null;
- results = new ArrayList<>();
+ // rows are always appended at the tail and deleted from the
head of the list
+ results = new LinkedList<>();
}
//
--------------------------------------------------------------------------------------------
@@ -133,6 +136,13 @@ protected void refresh() {
}
// update results
+
+ // formatting and printing of rows is
expensive in the current implementation,
+ // therefore we limit the maximum
number of lines shown in changelog mode to
+ // keep the CLI responsive
+ if (results.size() >=
DEFAULT_MAX_ROW_COUNT) {
+ results.remove(0);
+ }
results.add(changeRow);
scrolling++;
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index df42edd63b3..78f3a565554 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.IntStream;
import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
@@ -182,9 +181,8 @@ protected void init() {
protected List<AttributedString> computeMainLines() {
final List<AttributedString> lines = new ArrayList<>();
- IntStream.range(0, results.size()).forEach(lineIdx -> {
- final String[] line = results.get(lineIdx);
-
+ int lineIdx = 0;
+ for (String[] line : results) {
final AttributedStringBuilder row = new
AttributedStringBuilder();
// highlight selected row
@@ -192,7 +190,7 @@ protected void init() {
row.style(AttributedStyle.DEFAULT.inverse());
}
- IntStream.range(0, line.length).forEach(colIdx -> {
+ for (int colIdx = 0; colIdx < line.length; colIdx++) {
final String col = line[colIdx];
final int columnWidth =
computeColumnWidth(colIdx);
@@ -208,9 +206,11 @@ protected void init() {
} else {
normalizeColumn(row, col, columnWidth);
}
- });
+ }
lines.add(row.toAttributedString());
- });
+
+ lineIdx++;
+ }
return lines;
}
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index b7c28938121..0518dfce74d 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -92,6 +92,10 @@ public int getMaxParallelism() {
return
Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM,
Integer.toString(128)));
}
+ public int getMaxTableResultRows() {
+ return
Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_TABLE_RESULT_ROWS,
Integer.toString(1_000_000)));
+ }
+
public RestartStrategies.RestartStrategyConfiguration
getRestartStrategy() {
final String restartStrategy = properties.getOrDefault(
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 2a6b001d5f4..2067c5a971b 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,8 @@ private PropertyStrings() {
public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+ public static final String EXECUTION_MAX_TABLE_RESULT_ROWS =
"max-table-result-rows";
+
public static final String EXECUTION_RESTART_STRATEGY_TYPE =
"restart-strategy.type";
public static final String
EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index a54160f24b8..93fdd483d44 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -71,7 +71,12 @@ public ResultStore(Configuration flinkConfig) {
if (env.getExecution().isChangelogMode()) {
return new
ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
} else {
- return new
MaterializedCollectStreamResult<>(outputType, config, gatewayAddress,
gatewayPort);
+ return new MaterializedCollectStreamResult<>(
+ outputType,
+ config,
+ gatewayAddress,
+ gatewayPort,
+
env.getExecution().getMaxTableResultRows());
}
} else {
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 45c4f7553c6..0beabe94d2c 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.client.gateway.local.result;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,31 @@
*/
public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C>
implements MaterializedResult<C> {
+ /** Maximum initial capacity of the materialized table. */
+ public static final int MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY =
1_000_000;
+
+ /** Maximum overcommitment of the materialized table. */
+ public static final int MATERIALIZED_TABLE_MAX_OVERCOMMIT = 1_000_000;
+
+ /** Factor for the initial capacity of the materialized table. */
+ public static final double MATERIALIZED_TABLE_CAPACITY_FACTOR = 0.05;
+
+ /** Factor for cleaning up deleted rows in the materialized table. */
+ public static final double MATERIALIZED_TABLE_OVERCOMMIT_FACTOR = 0.01;
+
+ /**
+ * Maximum number of materialized rows to be stored. After the count is
reached, oldest
+ * rows are dropped.
+ */
+ private final int maxRowCount;
+
+ /** Threshold for cleaning up deleted rows in the materialized table. */
+ private final int overcommitThreshold;
+
+ /**
+ * Materialized table that is continuously updated by inserts and
deletes. Deletes at
+ * the beginning are lazily cleaned up when the threshold is reached.
+ */
private final List<Row> materializedTable;
/**
@@ -47,26 +73,65 @@
*/
private final Map<Row, Integer> rowPositionCache;
+ /** Current snapshot of the materialized table. */
private final List<Row> snapshot;
+ /** Counter for deleted rows to be deleted at the beginning of the
materialized table. */
+ private int validRowPosition;
+
+ /** Page count of the snapshot (always >= 1). */
private int pageCount;
+ /** Page size of the snapshot (always >= 1). */
private int pageSize;
+ /** Indicator that this is the last snapshot possible (EOS afterwards).
*/
private boolean isLastSnapshot;
- public MaterializedCollectStreamResult(TypeInformation<Row> outputType,
ExecutionConfig config,
- InetAddress gatewayAddress, int gatewayPort) {
+ @VisibleForTesting
+ public MaterializedCollectStreamResult(
+ TypeInformation<Row> outputType,
+ ExecutionConfig config,
+ InetAddress gatewayAddress,
+ int gatewayPort,
+ int maxRowCount,
+ int overcommitThreshold) {
super(outputType, config, gatewayAddress, gatewayPort);
+ if (maxRowCount <= 0) {
+ this.maxRowCount = Integer.MAX_VALUE;
+ } else {
+ this.maxRowCount = maxRowCount;
+ }
+
+ this.overcommitThreshold = overcommitThreshold;
+
// prepare for materialization
- materializedTable = new ArrayList<>();
- rowPositionCache = new HashMap<>();
+ final int initialCapacity =
computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
+ materializedTable = new ArrayList<>(initialCapacity);
+ rowPositionCache = new HashMap<>(initialCapacity);
snapshot = new ArrayList<>();
+ validRowPosition = 0;
isLastSnapshot = false;
pageCount = 0;
}
+ public MaterializedCollectStreamResult(
+ TypeInformation<Row> outputType,
+ ExecutionConfig config,
+ InetAddress gatewayAddress,
+ int gatewayPort,
+ int maxRowCount) {
+
+ this(
+ outputType,
+ config,
+ gatewayAddress,
+ gatewayPort,
+ maxRowCount,
+ computeMaterializedTableOvercommit(maxRowCount));
+ }
+
@Override
public boolean isMaterialized() {
return true;
@@ -74,6 +139,10 @@ public boolean isMaterialized() {
@Override
public TypedResult<Integer> snapshot(int pageSize) {
+ if (pageSize < 1) {
+ throw new SqlExecutionException("Page size must be
greater than 0.");
+ }
+
synchronized (resultLock) {
// retrieval thread is dead and there are no results
anymore
// or program failed
@@ -87,7 +156,9 @@ else if (!isRetrieving()) {
this.pageSize = pageSize;
snapshot.clear();
- snapshot.addAll(materializedTable);
+ for (int i = validRowPosition; i <
materializedTable.size(); i++) {
+ snapshot.add(materializedTable.get(i));
+ }
// at least one page
pageCount = Math.max(1, (int) Math.ceil(((double)
snapshot.size() / pageSize)));
@@ -112,31 +183,82 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2<Boolean, Row> change) {
synchronized (resultLock) {
- final Row row = change.f1;
// insert
if (change.f0) {
- materializedTable.add(row);
- rowPositionCache.put(row,
materializedTable.size() - 1);
+ processInsert(change.f1);
}
// delete
else {
- // delete the newest record first to minimize
per-page changes
- final Integer cachedPos =
rowPositionCache.get(row);
- final int startSearchPos;
- if (cachedPos != null) {
- startSearchPos = Math.min(cachedPos,
materializedTable.size() - 1);
- } else {
- startSearchPos =
materializedTable.size() - 1;
- }
-
- for (int i = startSearchPos; i >= 0; i--) {
- if
(materializedTable.get(i).equals(row)) {
- materializedTable.remove(i);
- rowPositionCache.remove(row);
- break;
- }
- }
+ processDelete(change.f1);
}
}
}
+
+ @VisibleForTesting
+ protected List<Row> getMaterializedTable() {
+ return materializedTable;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private void processInsert(Row row) {
+ // limit the materialized table
+ if (materializedTable.size() - validRowPosition >= maxRowCount)
{
+ cleanUp();
+ }
+ materializedTable.add(row);
+ rowPositionCache.put(row, materializedTable.size() - 1);
+ }
+
+ private void processDelete(Row row) {
+ // delete the newest record first to minimize per-page changes
+ final Integer cachedPos = rowPositionCache.get(row);
+ final int startSearchPos;
+ if (cachedPos != null) {
+ startSearchPos = Math.min(cachedPos,
materializedTable.size() - 1);
+ } else {
+ startSearchPos = materializedTable.size() - 1;
+ }
+
+ for (int i = startSearchPos; i >= validRowPosition; i--) {
+ if (materializedTable.get(i).equals(row)) {
+ materializedTable.remove(i);
+ rowPositionCache.remove(row);
+ break;
+ }
+ }
+ }
+
+ private void cleanUp() {
+ // invalidate row
+ final Row deleteRow = materializedTable.get(validRowPosition);
+ if (rowPositionCache.get(deleteRow) == validRowPosition) {
+ // this row has no duplicates in the materialized table,
+ // it can be removed from the cache
+ rowPositionCache.remove(deleteRow);
+ }
+ materializedTable.set(validRowPosition, null);
+
+ validRowPosition++;
+
+ // perform clean up in batches
+ if (validRowPosition >= overcommitThreshold) {
+ materializedTable.subList(0, validRowPosition).clear();
+ // adjust all cached indexes
+ rowPositionCache.replaceAll((k, v) -> v -
validRowPosition);
+ validRowPosition = 0;
+ }
+ }
+
+ private static int computeMaterializedTableCapacity(int maxRowCount) {
+ return Math.min(
+ MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY,
+ Math.max(1, (int) (maxRowCount *
MATERIALIZED_TABLE_CAPACITY_FACTOR)));
+ }
+
+ private static int computeMaterializedTableOvercommit(int maxRowCount) {
+ return Math.min(
+ MATERIALIZED_TABLE_MAX_OVERCOMMIT,
+ (int) (maxRowCount *
MATERIALIZED_TABLE_OVERCOMMIT_FACTOR));
+ }
}
diff --git
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 76648d08e1a..495f5e0d314 100644
---
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -198,6 +198,7 @@ public void testGetSessionProperties() throws Exception {
expectedProperties.put("execution.max-idle-state-retention",
"0");
expectedProperties.put("execution.min-idle-state-retention",
"0");
expectedProperties.put("execution.result-mode", "table");
+ expectedProperties.put("execution.max-table-result-rows",
"100");
expectedProperties.put("execution.restart-strategy.type",
"failure-rate");
expectedProperties.put("execution.restart-strategy.max-failures-per-interval",
"10");
expectedProperties.put("execution.restart-strategy.failure-rate-interval",
"99000");
@@ -264,38 +265,47 @@ public void testStreamQueryExecutionChangelog() throws
Exception {
public void testStreamQueryExecutionTable() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
+
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_0", url.getPath());
replaceVars.put("$VAR_1", "/");
replaceVars.put("$VAR_2", "streaming");
replaceVars.put("$VAR_3", "table");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
- final Executor executor = createModifiedExecutor(clusterClient,
replaceVars);
- final SessionContext session = new
SessionContext("test-session", new Environment());
+ final String query = "SELECT scalarUDF(IntegerField1),
StringField1 FROM TableNumber1";
- try {
- // start job and retrieval
- final ResultDescriptor desc = executor.executeQuery(
- session,
- "SELECT scalarUDF(IntegerField1), StringField1
FROM TableNumber1");
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("47,Hello World");
+ expectedResults.add("27,Hello World");
+ expectedResults.add("37,Hello World");
+ expectedResults.add("37,Hello World");
+ expectedResults.add("47,Hello World");
+ expectedResults.add("57,Hello World!!!!");
+
+ executeStreamQueryTable(replaceVars, query, expectedResults);
+ }
- assertTrue(desc.isMaterialized());
+ @Test(timeout = 30_000L)
+ public void testStreamQueryExecutionLimitedTable() throws Exception {
+ final URL url =
getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
- final List<String> actualResults =
retrieveTableResult(executor, session, desc.getResultId());
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_0", url.getPath());
+ replaceVars.put("$VAR_1", "/");
+ replaceVars.put("$VAR_2", "streaming");
+ replaceVars.put("$VAR_3", "table");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "1");
- final List<String> expectedResults = new ArrayList<>();
- expectedResults.add("47,Hello World");
- expectedResults.add("27,Hello World");
- expectedResults.add("37,Hello World");
- expectedResults.add("37,Hello World");
- expectedResults.add("47,Hello World");
- expectedResults.add("57,Hello World!!!!");
+ final String query = "SELECT COUNT(*), StringField1 FROM
TableNumber1 GROUP BY StringField1";
- TestBaseUtils.compareResultCollections(expectedResults,
actualResults, Comparator.naturalOrder());
- } finally {
- executor.stop(session);
- }
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("1,Hello World!!!!");
+
+ executeStreamQueryTable(replaceVars, query, expectedResults);
}
@Test(timeout = 30_000L)
@@ -375,6 +385,28 @@ public void testStreamQueryExecutionSink() throws
Exception {
}
}
+ private void executeStreamQueryTable(
+ Map<String, String> replaceVars,
+ String query,
+ List<String> expectedResults) throws Exception {
+
+ final Executor executor = createModifiedExecutor(clusterClient,
replaceVars);
+ final SessionContext session = new
SessionContext("test-session", new Environment());
+
+ try {
+ // start job and retrieval
+ final ResultDescriptor desc =
executor.executeQuery(session, query);
+
+ assertTrue(desc.isMaterialized());
+
+ final List<String> actualResults =
retrieveTableResult(executor, session, desc.getResultId());
+
+ TestBaseUtils.compareResultCollections(expectedResults,
actualResults, Comparator.naturalOrder());
+ } finally {
+ executor.stop(session);
+ }
+ }
+
private void verifySinkResult(String path) throws IOException {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
@@ -392,6 +424,7 @@ private void verifySinkResult(String path) throws
IOException {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_2", "batch");
replaceVars.put("$VAR_UPDATE_MODE", "");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
return new LocalExecutor(
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
Collections.emptyList(),
diff --git
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7e41ffe111..ba8c9245cc9 100644
---
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -29,7 +29,9 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -42,13 +44,14 @@
public void testSnapshot() throws UnknownHostException {
final TypeInformation<Row> type = Types.ROW(Types.STRING,
Types.LONG);
- TestMaterializedCollectStreamResult result = null;
+ TestMaterializedCollectStreamResult<?> result = null;
try {
- result = new TestMaterializedCollectStreamResult(
+ result = new TestMaterializedCollectStreamResult<>(
type,
new ExecutionConfig(),
InetAddress.getLocalHost(),
- 0);
+ 0,
+ Integer.MAX_VALUE);
result.isRetrieving = true;
@@ -85,11 +88,59 @@ public void testSnapshot() throws UnknownHostException {
}
}
+ @Test
+ public void testLimitedSnapshot() throws UnknownHostException {
+ final TypeInformation<Row> type = Types.ROW(Types.STRING,
Types.LONG);
+
+ TestMaterializedCollectStreamResult<?> result = null;
+ try {
+ result = new TestMaterializedCollectStreamResult<>(
+ type,
+ new ExecutionConfig(),
+ InetAddress.getLocalHost(),
+ 0,
+ 2, // limit the materialized table to 2 rows
+ 3); // with 3 rows overcommitment
+
+ result.isRetrieving = true;
+
+ result.processRecord(Tuple2.of(true, Row.of("D", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+ result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+
+ assertEquals(
+ Arrays.asList(null, null, Row.of("B", 1),
Row.of("A", 1)), // two over-committed rows
+ result.getMaterializedTable());
+
+ assertEquals(TypedResult.payload(2),
result.snapshot(1));
+
+ assertEquals(Collections.singletonList(Row.of("B", 1)),
result.retrievePage(1));
+ assertEquals(Collections.singletonList(Row.of("A", 1)),
result.retrievePage(2));
+
+ result.processRecord(Tuple2.of(true, Row.of("C", 1)));
+
+ assertEquals(
+ Arrays.asList(Row.of("A", 1), Row.of("C", 1)),
// limit clean up has taken place
+ result.getMaterializedTable());
+
+ result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+ assertEquals(
+ Collections.singletonList(Row.of("C", 1)), //
regular clean up has taken place
+ result.getMaterializedTable());
+ } finally {
+ if (result != null) {
+ result.close();
+ }
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Helper classes
//
--------------------------------------------------------------------------------------------
- private static class TestMaterializedCollectStreamResult extends
MaterializedCollectStreamResult {
+ private static class TestMaterializedCollectStreamResult<T> extends
MaterializedCollectStreamResult<T> {
public boolean isRetrieving;
@@ -97,18 +148,42 @@ public TestMaterializedCollectStreamResult(
TypeInformation<Row> outputType,
ExecutionConfig config,
InetAddress gatewayAddress,
- int gatewayPort) {
+ int gatewayPort,
+ int maxRowCount,
+ int overcommitThreshold) {
super(
outputType,
config,
gatewayAddress,
- gatewayPort);
+ gatewayPort,
+ maxRowCount,
+ overcommitThreshold);
+ }
+
+ public TestMaterializedCollectStreamResult(
+ TypeInformation<Row> outputType,
+ ExecutionConfig config,
+ InetAddress gatewayAddress,
+ int gatewayPort,
+ int maxRowCount) {
+
+ super(
+ outputType,
+ config,
+ gatewayAddress,
+ gatewayPort,
+ maxRowCount);
}
@Override
protected boolean isRetrieving() {
return isRetrieving;
}
+
+ @Override
+ public List<Row> getMaterializedTable() {
+ return super.getMaterializedTable();
+ }
}
}
diff --git
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index cd5257e611c..e9c6d5bf32d 100644
---
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -123,6 +123,7 @@ execution:
min-idle-state-retention: 0
max-idle-state-retention: 0
result-mode: "$VAR_3"
+ max-table-result-rows: "$VAR_MAX_ROWS"
restart-strategy:
type: failure-rate
max-failures-per-interval: 10
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Improve basic embedded SQL client
> ----------------------------------
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make
> the SQL client more stable:
> - -Add more tests for executor-
> - Configure JVM heap size
> - Limit changelog and table buffers
> - -"The input is invalid please check it again." => add allowed range-
> - Load dependencies recursively
> - Clean up results in result store
> - -Improve error message for unsupported batch queries-
> - -Add more logging instead swallowing exceptions-
> - -List properties in error message about missing TS factory sorted by name-
> - Add command to show loaded TS factories and their required propeties
> - Add command to reload configuration from files (no need to restart client)
> - Improve error message in case of invalid json-schema (right now:
> {{java.lang.IllegalArgumentException: No type could be found in node:<root>}}
> - -Add switch to show full stacktraces of exceptions- solved by logging
> - Give error message when setting unknown parameters
> {{result-mode=changelog}} does not give an error but should be
> {{execution.result-mode=changelog}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)