[
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595148#comment-16595148
]
ASF GitHub Bot commented on FLINK-8686:
---------------------------------------
fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client]
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213359097
##########
File path:
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -112,31 +155,64 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2<Boolean, Row> change) {
synchronized (resultLock) {
- final Row row = change.f1;
// insert
if (change.f0) {
- materializedTable.add(row);
- rowPositionCache.put(row,
materializedTable.size() - 1);
+ processInsert(change.f1);
}
// delete
else {
- // delete the newest record first to minimize
per-page changes
- final Integer cachedPos =
rowPositionCache.get(row);
- final int startSearchPos;
- if (cachedPos != null) {
- startSearchPos = Math.min(cachedPos,
materializedTable.size() - 1);
- } else {
- startSearchPos =
materializedTable.size() - 1;
- }
-
- for (int i = startSearchPos; i >= 0; i--) {
- if
(materializedTable.get(i).equals(row)) {
- materializedTable.remove(i);
- rowPositionCache.remove(row);
- break;
- }
- }
+ processDelete(change.f1);
}
}
}
+
+ @VisibleForTesting
+ protected List<Row> getMaterializedTable() {
+ return materializedTable;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private void processInsert(Row row) {
+ // limit the materialized table
+ if (materializedTable.size() - validRowPosition >= maxRowCount)
{
+ cleanUp();
+ }
+ materializedTable.add(row);
+ rowPositionCache.put(row, materializedTable.size() - 1);
+ }
+
+ private void processDelete(Row row) {
+ // delete the newest record first to minimize per-page changes
+ final Integer cachedPos = rowPositionCache.get(row);
+ final int startSearchPos;
+ if (cachedPos != null) {
+ startSearchPos = Math.min(cachedPos,
materializedTable.size() - 1);
+ } else {
+ startSearchPos = materializedTable.size() - 1;
+ }
+
+ for (int i = startSearchPos; i >= validRowPosition; i--) {
+ if (materializedTable.get(i).equals(row)) {
+ materializedTable.remove(i);
+ rowPositionCache.remove(row);
+ break;
+ }
+ }
+ }
+
+ private void cleanUp() {
+ // invalidate row
+ final Row deleteRow = materializedTable.get(0);
+ rowPositionCache.remove(deleteRow);
+ materializedTable.set(0, null);
+
+ validRowPosition++;
+
+ // perform clean up in batches
+ if (validRowPosition >= overcommitThreshold) {
+ materializedTable.subList(0, validRowPosition).clear();
Review comment:
We could update all indexes with a pass over the cache and subtracting
`overcommitThreshold` from all cached indexes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Improve basic embedded SQL client
> ----------------------------------
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make
> the SQL client more stable:
> - -Add more tests for executor-
> - Configure JVM heap size
> - Limit changelog and table buffers
> - -"The input is invalid please check it again." => add allowed range-
> - Load dependencies recursively
> - Clean up results in result store
> - -Improve error message for unsupported batch queries-
> - -Add more logging instead swallowing exceptions-
> - -List properties in error message about missing TS factory sorted by name-
> - Add command to show loaded TS factories and their required propeties
> - Add command to reload configuration from files (no need to restart client)
> - Improve error message in case of invalid json-schema (right now:
> {{java.lang.IllegalArgumentException: No type could be found in node:<root>}}
> - -Add switch to show full stacktraces of exceptions- solved by logging
> - Give error message when setting unknown parameters
> {{result-mode=changelog}} does not give an error but should be
> {{execution.result-mode=changelog}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)