This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d78bb60 [FLINK-10192] [sql-client] Fix SQL Client table visualization mode d78bb60 is described below commit d78bb60715044077eb4267ad4b171616e94d90e3 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Aug 24 12:13:45 2018 +0200 [FLINK-10192] [sql-client] Fix SQL Client table visualization mode Fixes the wrong materialization for the debugging visualization in table mode. Reworks the caching mechanism in MaterializedCollectStreamResult. This closes #6617. --- .../flink/table/client/gateway/TypedResult.java | 19 ++++ .../result/MaterializedCollectStreamResult.java | 51 +++++---- .../MaterializedCollectStreamResultTest.java | 114 +++++++++++++++++++++ 3 files changed, 162 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java index ee4e8d3..6ef8ef3 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.gateway; +import java.util.Objects; + /** * Result with an attached type (actual payload, EOS, etc.). * @@ -55,6 +57,23 @@ public class TypedResult<P> { return "TypedResult<" + type + ">"; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TypedResult<?> that = (TypedResult<?>) o; + return type == that.type && Objects.equals(payload, that.payload); + } + + @Override + public int hashCode() { + return Objects.hash(type, payload); + } + // -------------------------------------------------------------------------------------------- public static <T> TypedResult<T> empty() { diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java index 7321bd0..45c4f75 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java @@ -39,10 +39,20 @@ import java.util.Map; public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> { private final List<Row> materializedTable; - private final Map<Row, List<Integer>> rowPositions; // positions of rows in table for faster access + + /** + * Caches the last row position for faster access. The position might not be exact (if rows + * with smaller position are deleted) nor complete (for deletes of duplicates). However, the + * cache narrows the search in the materialized table. + */ + private final Map<Row, Integer> rowPositionCache; + private final List<Row> snapshot; + private int pageCount; + private int pageSize; + private boolean isLastSnapshot; public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config, @@ -51,7 +61,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i // prepare for materialization materializedTable = new ArrayList<>(); - rowPositions = new HashMap<>(); + rowPositionCache = new HashMap<>(); snapshot = new ArrayList<>(); isLastSnapshot = false; pageCount = 0; @@ -101,32 +111,29 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i @Override protected void processRecord(Tuple2<Boolean, Row> change) { - // we track the position of rows for faster access and in order to return consistent - // snapshots where new rows are appended at the end synchronized (resultLock) { - final List<Integer> positions = rowPositions.get(change.f1); - + final Row row = change.f1; // insert if (change.f0) { - materializedTable.add(change.f1); - if (positions == null) { - // new row - final ArrayList<Integer> pos = new ArrayList<>(1); - pos.add(materializedTable.size() - 1); - rowPositions.put(change.f1, pos); - } else { - // row exists already, only add position - positions.add(materializedTable.size() - 1); - } + materializedTable.add(row); + rowPositionCache.put(row, materializedTable.size() - 1); } // delete else { - if (positions != null) { - // delete row position and row itself - final int pos = positions.remove(positions.size() - 1); - materializedTable.remove(pos); - if (positions.isEmpty()) { - rowPositions.remove(change.f1); + // delete the newest record first to minimize per-page changes + final Integer cachedPos = rowPositionCache.get(row); + final int startSearchPos; + if (cachedPos != null) { + startSearchPos = Math.min(cachedPos, materializedTable.size() - 1); + } else { + startSearchPos = materializedTable.size() - 1; + } + + for (int i = startSearchPos; i >= 0; i--) { + if (materializedTable.get(i).equals(row)) { + materializedTable.remove(i); + rowPositionCache.remove(row); + break; } } } diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java new file mode 100644 index 0000000..c7e41ff --- /dev/null +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.local.result; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link MaterializedCollectStreamResult}. + */ +public class MaterializedCollectStreamResultTest { + + @Test + public void testSnapshot() throws UnknownHostException { + final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG); + + TestMaterializedCollectStreamResult result = null; + try { + result = new TestMaterializedCollectStreamResult( + type, + new ExecutionConfig(), + InetAddress.getLocalHost(), + 0); + + result.isRetrieving = true; + + result.processRecord(Tuple2.of(true, Row.of("A", 1))); + result.processRecord(Tuple2.of(true, Row.of("B", 1))); + result.processRecord(Tuple2.of(true, Row.of("A", 1))); + result.processRecord(Tuple2.of(true, Row.of("C", 2))); + + assertEquals(TypedResult.payload(4), result.snapshot(1)); + + assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1)); + assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2)); + assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3)); + assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4)); + + result.processRecord(Tuple2.of(false, Row.of("A", 1))); + + assertEquals(TypedResult.payload(3), result.snapshot(1)); + + assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(1)); + assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2)); + assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3)); + + result.processRecord(Tuple2.of(false, Row.of("C", 2))); + result.processRecord(Tuple2.of(false, Row.of("A", 1))); + + assertEquals(TypedResult.payload(1), result.snapshot(1)); + + assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(1)); + } finally { + if (result != null) { + result.close(); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Helper classes + // -------------------------------------------------------------------------------------------- + + private static class TestMaterializedCollectStreamResult extends MaterializedCollectStreamResult { + + public boolean isRetrieving; + + public TestMaterializedCollectStreamResult( + TypeInformation<Row> outputType, + ExecutionConfig config, + InetAddress gatewayAddress, + int gatewayPort) { + + super( + outputType, + config, + gatewayAddress, + gatewayPort); + } + + @Override + protected boolean isRetrieving() { + return isRetrieving; + } + } +}