Repository: flink Updated Branches: refs/heads/release-1.5 ea53eec0f -> 3a3caac9f
[FLINK-8687] [sql-client] Make MaterializedCollectStreamResult#retrievePage to have resultLock This closes #5647. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a3caac9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a3caac9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a3caac9 Branch: refs/heads/release-1.5 Commit: 3a3caac9ff3b27fe9ad5b9868eba8e0ec44fdb9c Parents: ea53eec Author: mingleiZhang <zml13856086...@163.com> Authored: Wed Mar 7 10:36:52 2018 +0800 Committer: Timo Walther <twal...@apache.org> Committed: Mon Mar 12 16:44:15 2018 +0100 ---------------------------------------------------------------------- .../gateway/local/MaterializedCollectStreamResult.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3a3caac9/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java index bd7f08e..7935da6 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java @@ -86,11 +86,13 @@ public class MaterializedCollectStreamResult extends CollectStreamResult impleme @Override public List<Row> retrievePage(int page) { - if (page <= 0 || page > pageCount) { - throw new SqlExecutionException("Invalid page '" + page + "'."); - } + synchronized (resultLock) { + if (page <= 0 || page > pageCount) { + throw new SqlExecutionException("Invalid page '" + page + "'."); + } - return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page)); + return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page)); + } } // --------------------------------------------------------------------------------------------