Repository: flink
Updated Branches:
  refs/heads/release-1.5 ea53eec0f -> 3a3caac9f


[FLINK-8687] [sql-client] Make MaterializedCollectStreamResult#retrievePage to 
have resultLock

This closes #5647.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a3caac9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a3caac9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a3caac9

Branch: refs/heads/release-1.5
Commit: 3a3caac9ff3b27fe9ad5b9868eba8e0ec44fdb9c
Parents: ea53eec
Author: mingleiZhang <zml13856086...@163.com>
Authored: Wed Mar 7 10:36:52 2018 +0800
Committer: Timo Walther <twal...@apache.org>
Committed: Mon Mar 12 16:44:15 2018 +0100

----------------------------------------------------------------------
 .../gateway/local/MaterializedCollectStreamResult.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a3caac9/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
index bd7f08e..7935da6 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
@@ -86,11 +86,13 @@ public class MaterializedCollectStreamResult extends 
CollectStreamResult impleme
 
        @Override
        public List<Row> retrievePage(int page) {
-               if (page <= 0 || page > pageCount) {
-                       throw new SqlExecutionException("Invalid page '" + page 
+ "'.");
-               }
+               synchronized (resultLock) {
+                       if (page <= 0 || page > pageCount) {
+                               throw new SqlExecutionException("Invalid page 
'" + page + "'.");
+                       }
 
-               return snapshot.subList(pageSize * (page - 1), 
Math.min(snapshot.size(), pageSize * page));
+                       return snapshot.subList(pageSize * (page - 1), 
Math.min(snapshot.size(), pageSize * page));
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------

Reply via email to