Repository: flink Updated Branches: refs/heads/master 961df0d6c -> 7d837a3e8
[FLINK-8687] [sql-client] Make MaterializedCollectStreamResult#retrievePage to have resultLock This closes #5647. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d837a3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d837a3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d837a3e Branch: refs/heads/master Commit: 7d837a3e884eba9937fb4b14fd9c76e8895d5703 Parents: 961df0d Author: mingleiZhang <zml13856086...@163.com> Authored: Wed Mar 7 10:36:52 2018 +0800 Committer: Timo Walther <twal...@apache.org> Committed: Mon Mar 12 16:42:17 2018 +0100 ---------------------------------------------------------------------- .../gateway/local/MaterializedCollectStreamResult.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7d837a3e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java index bd7f08e..7935da6 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java @@ -86,11 +86,13 @@ public class MaterializedCollectStreamResult extends CollectStreamResult impleme @Override public List<Row> retrievePage(int page) { - if (page <= 0 || page > pageCount) { - throw new SqlExecutionException("Invalid page '" + page + "'."); - } + synchronized (resultLock) { + if (page <= 0 || page > pageCount) { + throw new SqlExecutionException("Invalid page '" + page + "'."); + } - return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page)); + return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page)); + } } // --------------------------------------------------------------------------------------------