This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d78bb60  [FLINK-10192] [sql-client] Fix SQL Client table visualization 
mode
d78bb60 is described below

commit d78bb60715044077eb4267ad4b171616e94d90e3
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Aug 24 12:13:45 2018 +0200

    [FLINK-10192] [sql-client] Fix SQL Client table visualization mode
    
    Fixes the wrong materialization for the debugging visualization
    in table mode. Reworks the caching mechanism in 
MaterializedCollectStreamResult.
    
    This closes #6617.
---
 .../flink/table/client/gateway/TypedResult.java    |  19 ++++
 .../result/MaterializedCollectStreamResult.java    |  51 +++++----
 .../MaterializedCollectStreamResultTest.java       | 114 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 22 deletions(-)

diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
index ee4e8d3..6ef8ef3 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.gateway;
 
+import java.util.Objects;
+
 /**
  * Result with an attached type (actual payload, EOS, etc.).
  *
@@ -55,6 +57,23 @@ public class TypedResult<P> {
                return "TypedResult<" + type + ">";
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               TypedResult<?> that = (TypedResult<?>) o;
+               return type == that.type && Objects.equals(payload, 
that.payload);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(type, payload);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public static <T> TypedResult<T> empty() {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 7321bd0..45c4f75 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -39,10 +39,20 @@ import java.util.Map;
 public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> 
implements MaterializedResult<C> {
 
        private final List<Row> materializedTable;
-       private final Map<Row, List<Integer>> rowPositions; // positions of 
rows in table for faster access
+
+       /**
+        * Caches the last row position for faster access. The position might 
not be exact (if rows
+        * with smaller position are deleted) nor complete (for deletes of 
duplicates). However, the
+        * cache narrows the search in the materialized table.
+        */
+       private final Map<Row, Integer> rowPositionCache;
+
        private final List<Row> snapshot;
+
        private int pageCount;
+
        private int pageSize;
+
        private boolean isLastSnapshot;
 
        public MaterializedCollectStreamResult(TypeInformation<Row> outputType, 
ExecutionConfig config,
@@ -51,7 +61,7 @@ public class MaterializedCollectStreamResult<C> extends 
CollectStreamResult<C> i
 
                // prepare for materialization
                materializedTable = new ArrayList<>();
-               rowPositions = new HashMap<>();
+               rowPositionCache = new HashMap<>();
                snapshot = new ArrayList<>();
                isLastSnapshot = false;
                pageCount = 0;
@@ -101,32 +111,29 @@ public class MaterializedCollectStreamResult<C> extends 
CollectStreamResult<C> i
 
        @Override
        protected void processRecord(Tuple2<Boolean, Row> change) {
-               // we track the position of rows for faster access and in order 
to return consistent
-               // snapshots where new rows are appended at the end
                synchronized (resultLock) {
-                       final List<Integer> positions = 
rowPositions.get(change.f1);
-
+                       final Row row = change.f1;
                        // insert
                        if (change.f0) {
-                               materializedTable.add(change.f1);
-                               if (positions == null) {
-                                       // new row
-                                       final ArrayList<Integer> pos = new 
ArrayList<>(1);
-                                       pos.add(materializedTable.size() - 1);
-                                       rowPositions.put(change.f1, pos);
-                               } else {
-                                       // row exists already, only add position
-                                       positions.add(materializedTable.size() 
- 1);
-                               }
+                               materializedTable.add(row);
+                               rowPositionCache.put(row, 
materializedTable.size() - 1);
                        }
                        // delete
                        else {
-                               if (positions != null) {
-                                       // delete row position and row itself
-                                       final int pos = 
positions.remove(positions.size() - 1);
-                                       materializedTable.remove(pos);
-                                       if (positions.isEmpty()) {
-                                               rowPositions.remove(change.f1);
+                               // delete the newest record first to minimize 
per-page changes
+                               final Integer cachedPos = 
rowPositionCache.get(row);
+                               final int startSearchPos;
+                               if (cachedPos != null) {
+                                       startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
+                               } else {
+                                       startSearchPos = 
materializedTable.size() - 1;
+                               }
+
+                               for (int i = startSearchPos; i >= 0; i--) {
+                                       if 
(materializedTable.get(i).equals(row)) {
+                                               materializedTable.remove(i);
+                                               rowPositionCache.remove(row);
+                                               break;
                                        }
                                }
                        }
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
new file mode 100644
index 0000000..c7e41ff
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MaterializedCollectStreamResult}.
+ */
+public class MaterializedCollectStreamResultTest {
+
+       @Test
+       public void testSnapshot() throws UnknownHostException {
+               final TypeInformation<Row> type = Types.ROW(Types.STRING, 
Types.LONG);
+
+               TestMaterializedCollectStreamResult result = null;
+               try {
+                       result = new TestMaterializedCollectStreamResult(
+                               type,
+                               new ExecutionConfig(),
+                               InetAddress.getLocalHost(),
+                               0);
+
+                       result.isRetrieving = true;
+
+                       result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("B", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("A", 1)));
+                       result.processRecord(Tuple2.of(true, Row.of("C", 2)));
+
+                       assertEquals(TypedResult.payload(4), 
result.snapshot(1));
+
+                       assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(1));
+                       assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(2));
+                       assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(3));
+                       assertEquals(Collections.singletonList(Row.of("C", 2)), 
result.retrievePage(4));
+
+                       result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+                       assertEquals(TypedResult.payload(3), 
result.snapshot(1));
+
+                       assertEquals(Collections.singletonList(Row.of("A", 1)), 
result.retrievePage(1));
+                       assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(2));
+                       assertEquals(Collections.singletonList(Row.of("C", 2)), 
result.retrievePage(3));
+
+                       result.processRecord(Tuple2.of(false, Row.of("C", 2)));
+                       result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+
+                       assertEquals(TypedResult.payload(1), 
result.snapshot(1));
+
+                       assertEquals(Collections.singletonList(Row.of("B", 1)), 
result.retrievePage(1));
+               } finally {
+                       if (result != null) {
+                               result.close();
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper classes
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class TestMaterializedCollectStreamResult extends 
MaterializedCollectStreamResult {
+
+               public boolean isRetrieving;
+
+               public TestMaterializedCollectStreamResult(
+                               TypeInformation<Row> outputType,
+                               ExecutionConfig config,
+                               InetAddress gatewayAddress,
+                               int gatewayPort) {
+
+                       super(
+                               outputType,
+                               config,
+                               gatewayAddress,
+                               gatewayPort);
+               }
+
+               @Override
+               protected boolean isRetrieving() {
+                       return isRetrieving;
+               }
+       }
+}

Reply via email to