This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 84d0c1859 [flink] Add supports of lookup join on append-only table
(#1424)
84d0c1859 is described below
commit 84d0c1859ffd978d7ad7bd677a8e8bd21a722e62
Author: Aitozi <[email protected]>
AuthorDate: Tue Jun 27 16:26:19 2023 +0800
[flink] Add supports of lookup join on append-only table (#1424)
---
docs/content/how-to/lookup-joins.md | 2 +-
.../flink/lookup/FileStoreLookupFunction.java | 6 --
.../flink/lookup/ListDelimitedSerializer.java | 93 +++++++++++++++++++
.../apache/paimon/flink/lookup/LookupTable.java | 13 ++-
.../flink/lookup/NoPrimaryKeyLookupTable.java | 85 +++++++++++++++++
.../paimon/flink/lookup/RocksDBListState.java | 84 +++++++++++++++++
.../paimon/flink/lookup/RocksDBStateFactory.java | 16 +++-
.../org/apache/paimon/flink/LookupJoinITCase.java | 34 +++++++
.../paimon/flink/lookup/LookupTableTest.java | 28 ++++++
.../paimon/flink/lookup/RocksDBListStateTest.java | 102 +++++++++++++++++++++
10 files changed, 451 insertions(+), 12 deletions(-)
diff --git a/docs/content/how-to/lookup-joins.md
b/docs/content/how-to/lookup-joins.md
index f7253e0db..878bf6eb6 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -28,7 +28,7 @@ under the License.
[Lookup
Joins](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/)
are a type of join in streaming queries. It is used to enrich a table with
data that is queried from Paimon. The join requires one table to have a
processing time attribute and the other table to be backed by a lookup source
connector.
-Paimon supports lookup joins on tables with primary keys in Flink. The
following example illustrates this feature.
+Paimon supports lookup joins on tables with primary keys and append-only
tables in Flink. The following example illustrates this feature.
First, let's create a Paimon table and update it in real-time.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 68a1d225f..d6ef13ca8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -60,7 +60,6 @@ import java.util.stream.IntStream;
import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A lookup {@link TableFunction} for file store. */
public class FileStoreLookupFunction implements Serializable, Closeable {
@@ -85,11 +84,6 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable
Predicate predicate) {
- checkArgument(
- table.primaryKeys().size() > 0,
- String.format(
- "Currently only support primary key table, the lookup
table is [%s].",
- table.name()));
TableScanUtils.streamingReadingValidate(table);
this.table = table;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
new file mode 100644
index 000000000..56595cf14
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates a logic of serialization and deserialization of a list with a
delimiter. Used in the
+ * savepoint format.
+ */
+public final class ListDelimitedSerializer {
+
+ private static final byte DELIMITER = ',';
+
+ private final DataInputDeserializer dataInputView = new
DataInputDeserializer();
+ private final DataOutputSerializer dataOutputView = new
DataOutputSerializer(128);
+
+ public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T>
elementSerializer) {
+ if (valueBytes == null) {
+ return null;
+ }
+
+ dataInputView.setBuffer(valueBytes);
+
+ List<T> result = new ArrayList<>();
+ T next;
+ while ((next = deserializeNextElement(dataInputView,
elementSerializer)) != null) {
+ result.add(next);
+ }
+ return result;
+ }
+
+ public <T> byte[] serializeList(List<T> valueList, Serializer<T>
elementSerializer)
+ throws IOException {
+
+ dataOutputView.clear();
+ boolean first = true;
+
+ for (T value : valueList) {
+ checkNotNull(value, "You cannot add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ dataOutputView.write(DELIMITER);
+ }
+ elementSerializer.serialize(value, dataOutputView);
+ }
+
+ return dataOutputView.getCopyOfBuffer();
+ }
+
+ /** Deserializes a single element from a serialized list. */
+ public static <T> T deserializeNextElement(
+ DataInputDeserializer in, Serializer<T> elementSerializer) {
+ try {
+ if (in.available() > 0) {
+ T element = elementSerializer.deserialize(in);
+ if (in.available() > 0) {
+ in.readByte();
+ }
+ return element;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected list element
deserialization failure", e);
+ }
+ return null;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 403786ada..29f319ee1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -42,12 +42,17 @@ public interface LookupTable {
Predicate<InternalRow> recordFilter,
long lruCacheSize)
throws IOException {
- if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
- return new PrimaryKeyLookupTable(
+ if (primaryKey.isEmpty()) {
+ return new NoPrimaryKeyLookupTable(
stateFactory, rowType, joinKey, recordFilter,
lruCacheSize);
} else {
- return new SecondaryIndexLookupTable(
- stateFactory, rowType, primaryKey, joinKey, recordFilter,
lruCacheSize);
+ if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
+ return new PrimaryKeyLookupTable(
+ stateFactory, rowType, joinKey, recordFilter,
lruCacheSize);
+ } else {
+ return new SecondaryIndexLookupTable(
+ stateFactory, rowType, primaryKey, joinKey,
recordFilter, lruCacheSize);
+ }
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
new file mode 100644
index 000000000..c413d8a3e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for table without primary key. */
+public class NoPrimaryKeyLookupTable implements LookupTable {
+
+ private final RocksDBListState state;
+
+ private final Predicate<InternalRow> recordFilter;
+
+ private final KeyProjectedRow joinKeyRow;
+
+ public NoPrimaryKeyLookupTable(
+ RocksDBStateFactory stateFactory,
+ RowType rowType,
+ List<String> joinKeys,
+ Predicate<InternalRow> recordFilter,
+ long lruCacheSize)
+ throws IOException {
+ List<String> fieldNames = rowType.getFieldNames();
+ int[] joinKeyMapping =
joinKeys.stream().mapToInt(fieldNames::indexOf).toArray();
+ this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
+ this.state =
+ stateFactory.listState(
+ "join-key-index",
+ InternalSerializers.create(TypeUtils.project(rowType,
joinKeyMapping)),
+ InternalSerializers.create(rowType),
+ lruCacheSize);
+ this.recordFilter = recordFilter;
+ }
+
+ @Override
+ public List<InternalRow> get(InternalRow key) throws IOException {
+ return state.get(key);
+ }
+
+ @Override
+ public void refresh(Iterator<InternalRow> incremental) throws IOException {
+ while (incremental.hasNext()) {
+ InternalRow row = incremental.next();
+ joinKeyRow.replaceRow(row);
+ if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
+ if (recordFilter.test(row)) {
+ state.add(joinKeyRow, row);
+ }
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "Received %s message. Only INSERT/UPDATE_AFTER
values are expected here.",
+ row.getRowKind()));
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
new file mode 100644
index 000000000..e1e7a7efe
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.Serializer;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** RocksDB state for key -> List of value. */
+public class RocksDBListState extends RocksDBState<List<InternalRow>> {
+
+ private final ListDelimitedSerializer listSerializer = new
ListDelimitedSerializer();
+
+ private static final List<InternalRow> EMPTY = Collections.emptyList();
+
+ public RocksDBListState(
+ RocksDB db,
+ ColumnFamilyHandle columnFamily,
+ Serializer<InternalRow> keySerializer,
+ Serializer<InternalRow> valueSerializer,
+ long lruCacheSize) {
+ super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+ }
+
+ public void add(InternalRow key, InternalRow value) throws IOException {
+ byte[] keyBytes = serializeKey(key);
+ byte[] valueBytes = serializeValue(value);
+ try {
+ db.merge(columnFamily, writeOptions, keyBytes, valueBytes);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ cache.invalidate(wrap(keyBytes));
+ }
+
+ public List<InternalRow> get(InternalRow key) throws IOException {
+ byte[] keyBytes = serializeKey(key);
+ try {
+ return cache.get(
+ wrap(keyBytes),
+ () -> {
+ byte[] valueBytes = db.get(columnFamily, keyBytes);
+ List<InternalRow> rows =
+ listSerializer.deserializeList(valueBytes,
valueSerializer);
+ if (rows == null) {
+ return EMPTY;
+ }
+ return rows;
+ });
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private byte[] serializeValue(InternalRow value) throws IOException {
+ valueOutputView.clear();
+ valueSerializer.serialize(value, valueOutputView);
+ return valueOutputView.getCopyOfBuffer();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index bfb41e35d..c32703af7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -37,6 +37,8 @@ import java.nio.charset.StandardCharsets;
/** Factory to create state. */
public class RocksDBStateFactory implements Closeable {
+ public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+
private RocksDB db;
private final ColumnFamilyOptions columnFamilyOptions;
@@ -51,7 +53,8 @@ public class RocksDBStateFactory implements Closeable {
.setCreateIfMissing(true),
conf);
this.columnFamilyOptions =
- RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(),
conf);
+ RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(),
conf)
+ .setMergeOperatorName(MERGE_OPERATOR_NAME);
try {
this.db = RocksDB.open(new Options(dbOptions,
columnFamilyOptions), path);
@@ -80,6 +83,17 @@ public class RocksDBStateFactory implements Closeable {
db, createColumnFamily(name), keySerializer, valueSerializer,
lruCacheSize);
}
+ public RocksDBListState listState(
+ String name,
+ Serializer<InternalRow> keySerializer,
+ Serializer<InternalRow> valueSerializer,
+ long lruCacheSize)
+ throws IOException {
+
+ return new RocksDBListState(
+ db, createColumnFamily(name), keySerializer, valueSerializer,
lruCacheSize);
+ }
+
private ColumnFamilyHandle createColumnFamily(String name) throws
IOException {
try {
return db.createColumnFamily(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index b67b2fa70..a078858ec 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -543,4 +543,38 @@ public class LookupJoinITCase extends CatalogITCaseBase {
Row.of(4, null, null, null));
iterator.close();
}
+
+ @Test
+ public void testLookupNonPkAppendTable() throws Exception {
+ sql(
+ "CREATE TABLE DIM_NO_PK (i INT, j INT, k1 INT, k2 INT) "
+ + "PARTITIONED BY (`i`) WITH
('continuous.discovery-interval'='1 ms')");
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_NO_PK for
system_time as of T.proctime AS D ON T.i "
+ + "= D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, null, null, null),
+ Row.of(2, null, null, null),
+ Row.of(3, null, null, null));
+
+ sql(
+ "INSERT INTO DIM_NO_PK VALUES (1, 11, 111, 1111), (1, 12, 112,
1112), (1, 11, 111, 1111)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2), (4)");
+ result = iterator.collect(5);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(1, 11, 111, 1111),
+ Row.of(1, 12, 112, 1112),
+ Row.of(2, null, null, null),
+ Row.of(4, null, null, null));
+ iterator.close();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 99b547b5e..9c7366be6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -146,6 +147,33 @@ public class LookupTableTest {
assertThat(table.get(row(33))).hasSize(0);
}
+ @Test
+ public void testNoPrimaryKeyTable() throws IOException {
+ LookupTable table =
+ LookupTable.create(
+ stateFactory,
+ rowType,
+ Collections.emptyList(),
+ singletonList("f1"),
+ r -> r.getInt(2) < 222,
+ ThreadLocalRandom.current().nextInt(2) * 10);
+
+ table.refresh(singletonList(row(1, 11, 333)).iterator());
+ List<InternalRow> result = table.get(row(11));
+ assertThat(result).hasSize(0);
+
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
+ result = table.get(row(11));
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), 1, 11, 111);
+
+ table.refresh(singletonList(row(1, 11, 111)).iterator());
+ result = table.get(row(11));
+ assertThat(result).hasSize(2);
+ assertRow(result.get(0), 1, 11, 111);
+ assertRow(result.get(1), 1, 11, 111);
+ }
+
private static InternalRow row(Object... values) {
return row(RowKind.INSERT, values);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
new file mode 100644
index 000000000..dfd016650
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Test for {@link org.apache.paimon.flink.lookup.RocksDBListState}. */
+public class RocksDBListStateTest {
+
+ @TempDir Path tempDir;
+
+ @Test
+ void test() throws Exception {
+ RocksDBStateFactory factory = new
RocksDBStateFactory(tempDir.toString(), new Options());
+
+ RowType keyType = RowType.of(DataTypes.STRING());
+ RowType valueType = RowType.of(DataTypes.STRING());
+ RocksDBListState listState =
+ factory.listState(
+ "test",
+ new InternalRowSerializer(keyType),
+ new InternalRowSerializer(valueType),
+ 1);
+
+ GenericRow key = row("aaa");
+ listState.add(key, row("1"));
+ List<InternalRow> result = listState.get(key);
+ Assertions.assertEquals(Lists.newArrayList("1"), getString(result));
+ listState.add(key, row("2,3"));
+ Assertions.assertEquals(Lists.newArrayList("1", "2,3"),
getString(listState.get(key)));
+ listState.add(key, row("1"));
+ Assertions.assertEquals(Lists.newArrayList("1", "2,3", "1"),
getString(listState.get(key)));
+ Assertions.assertEquals(Lists.newArrayList("1", "2,3", "1"),
getString(listState.get(key)));
+ Assertions.assertTrue(listState.get(row("bbb")).isEmpty());
+ }
+
+ public GenericRow row(String value) {
+ return GenericRow.of(bs(value));
+ }
+
+ public GenericRow row(String value, RowKind kind) {
+ GenericRow row = GenericRow.of(bs(value));
+ row.setRowKind(kind);
+ return row;
+ }
+
+ public BinaryString bs(String v) {
+ return BinaryString.fromString(v);
+ }
+
+ public BinaryRow write(String v) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter write = new BinaryRowWriter(row);
+ write.writeString(0, bs(v));
+ return row;
+ }
+
+ public List<String> getString(List<InternalRow> inputs) {
+ List<String> rows = new ArrayList<>();
+ for (InternalRow input : inputs) {
+ rows.add(input.getString(0).toString());
+ }
+ return rows;
+ }
+}