This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 84d0c1859 [flink] Add supports of lookup join on append-only table 
(#1424)
84d0c1859 is described below

commit 84d0c1859ffd978d7ad7bd677a8e8bd21a722e62
Author: Aitozi <[email protected]>
AuthorDate: Tue Jun 27 16:26:19 2023 +0800

    [flink] Add supports of lookup join on append-only table (#1424)
---
 docs/content/how-to/lookup-joins.md                |   2 +-
 .../flink/lookup/FileStoreLookupFunction.java      |   6 --
 .../flink/lookup/ListDelimitedSerializer.java      |  93 +++++++++++++++++++
 .../apache/paimon/flink/lookup/LookupTable.java    |  13 ++-
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  85 +++++++++++++++++
 .../paimon/flink/lookup/RocksDBListState.java      |  84 +++++++++++++++++
 .../paimon/flink/lookup/RocksDBStateFactory.java   |  16 +++-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |  34 +++++++
 .../paimon/flink/lookup/LookupTableTest.java       |  28 ++++++
 .../paimon/flink/lookup/RocksDBListStateTest.java  | 102 +++++++++++++++++++++
 10 files changed, 451 insertions(+), 12 deletions(-)

diff --git a/docs/content/how-to/lookup-joins.md 
b/docs/content/how-to/lookup-joins.md
index f7253e0db..878bf6eb6 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -28,7 +28,7 @@ under the License.
 
 [Lookup 
Joins](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/)
 are a type of join in streaming queries. It is used to enrich a table with 
data that is queried from Paimon. The join requires one table to have a 
processing time attribute and the other table to be backed by a lookup source 
connector.
 
-Paimon supports lookup joins on tables with primary keys in Flink. The 
following example illustrates this feature.
+Paimon supports lookup joins on tables with primary keys and append-only 
tables in Flink. The following example illustrates this feature.
 
 First, let's create a Paimon table and update it in real-time.
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 68a1d225f..d6ef13ca8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -60,7 +60,6 @@ import java.util.stream.IntStream;
 
 import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A lookup {@link TableFunction} for file store. */
 public class FileStoreLookupFunction implements Serializable, Closeable {
@@ -85,11 +84,6 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     public FileStoreLookupFunction(
             Table table, int[] projection, int[] joinKeyIndex, @Nullable 
Predicate predicate) {
-        checkArgument(
-                table.primaryKeys().size() > 0,
-                String.format(
-                        "Currently only support primary key table, the lookup 
table is [%s].",
-                        table.name()));
         TableScanUtils.streamingReadingValidate(table);
 
         this.table = table;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
new file mode 100644
index 000000000..56595cf14
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates a logic of serialization and deserialization of a list with a 
delimiter. Used in the
+ * savepoint format.
+ */
+public final class ListDelimitedSerializer {
+
+    private static final byte DELIMITER = ',';
+
+    private final DataInputDeserializer dataInputView = new 
DataInputDeserializer();
+    private final DataOutputSerializer dataOutputView = new 
DataOutputSerializer(128);
+
+    public <T> List<T> deserializeList(byte[] valueBytes, Serializer<T> 
elementSerializer) {
+        if (valueBytes == null) {
+            return null;
+        }
+
+        dataInputView.setBuffer(valueBytes);
+
+        List<T> result = new ArrayList<>();
+        T next;
+        while ((next = deserializeNextElement(dataInputView, 
elementSerializer)) != null) {
+            result.add(next);
+        }
+        return result;
+    }
+
+    public <T> byte[] serializeList(List<T> valueList, Serializer<T> 
elementSerializer)
+            throws IOException {
+
+        dataOutputView.clear();
+        boolean first = true;
+
+        for (T value : valueList) {
+            checkNotNull(value, "You cannot add null to a value list.");
+
+            if (first) {
+                first = false;
+            } else {
+                dataOutputView.write(DELIMITER);
+            }
+            elementSerializer.serialize(value, dataOutputView);
+        }
+
+        return dataOutputView.getCopyOfBuffer();
+    }
+
+    /** Deserializes a single element from a serialized list. */
+    public static <T> T deserializeNextElement(
+            DataInputDeserializer in, Serializer<T> elementSerializer) {
+        try {
+            if (in.available() > 0) {
+                T element = elementSerializer.deserialize(in);
+                if (in.available() > 0) {
+                    in.readByte();
+                }
+                return element;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected list element 
deserialization failure", e);
+        }
+        return null;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 403786ada..29f319ee1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -42,12 +42,17 @@ public interface LookupTable {
             Predicate<InternalRow> recordFilter,
             long lruCacheSize)
             throws IOException {
-        if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
-            return new PrimaryKeyLookupTable(
+        if (primaryKey.isEmpty()) {
+            return new NoPrimaryKeyLookupTable(
                     stateFactory, rowType, joinKey, recordFilter, 
lruCacheSize);
         } else {
-            return new SecondaryIndexLookupTable(
-                    stateFactory, rowType, primaryKey, joinKey, recordFilter, 
lruCacheSize);
+            if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
+                return new PrimaryKeyLookupTable(
+                        stateFactory, rowType, joinKey, recordFilter, 
lruCacheSize);
+            } else {
+                return new SecondaryIndexLookupTable(
+                        stateFactory, rowType, primaryKey, joinKey, 
recordFilter, lruCacheSize);
+            }
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
new file mode 100644
index 000000000..c413d8a3e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -0,0 +1,85 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for table without primary key. */
+public class NoPrimaryKeyLookupTable implements LookupTable {
+
+    private final RocksDBListState state;
+
+    private final Predicate<InternalRow> recordFilter;
+
+    private final KeyProjectedRow joinKeyRow;
+
+    public NoPrimaryKeyLookupTable(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> joinKeys,
+            Predicate<InternalRow> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        List<String> fieldNames = rowType.getFieldNames();
+        int[] joinKeyMapping = 
joinKeys.stream().mapToInt(fieldNames::indexOf).toArray();
+        this.joinKeyRow = new KeyProjectedRow(joinKeyMapping);
+        this.state =
+                stateFactory.listState(
+                        "join-key-index",
+                        InternalSerializers.create(TypeUtils.project(rowType, 
joinKeyMapping)),
+                        InternalSerializers.create(rowType),
+                        lruCacheSize);
+        this.recordFilter = recordFilter;
+    }
+
+    @Override
+    public List<InternalRow> get(InternalRow key) throws IOException {
+        return state.get(key);
+    }
+
+    @Override
+    public void refresh(Iterator<InternalRow> incremental) throws IOException {
+        while (incremental.hasNext()) {
+            InternalRow row = incremental.next();
+            joinKeyRow.replaceRow(row);
+            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+                if (recordFilter.test(row)) {
+                    state.add(joinKeyRow, row);
+                }
+            } else {
+                throw new RuntimeException(
+                        String.format(
+                                "Received %s message. Only INSERT/UPDATE_AFTER 
values are expected here.",
+                                row.getRowKind()));
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
new file mode 100644
index 000000000..e1e7a7efe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.Serializer;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** RocksDB state for key -> List of value. */
+public class RocksDBListState extends RocksDBState<List<InternalRow>> {
+
+    private final ListDelimitedSerializer listSerializer = new 
ListDelimitedSerializer();
+
+    private static final List<InternalRow> EMPTY = Collections.emptyList();
+
+    public RocksDBListState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            Serializer<InternalRow> keySerializer,
+            Serializer<InternalRow> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    public void add(InternalRow key, InternalRow value) throws IOException {
+        byte[] keyBytes = serializeKey(key);
+        byte[] valueBytes = serializeValue(value);
+        try {
+            db.merge(columnFamily, writeOptions, keyBytes, valueBytes);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+        cache.invalidate(wrap(keyBytes));
+    }
+
+    public List<InternalRow> get(InternalRow key) throws IOException {
+        byte[] keyBytes = serializeKey(key);
+        try {
+            return cache.get(
+                    wrap(keyBytes),
+                    () -> {
+                        byte[] valueBytes = db.get(columnFamily, keyBytes);
+                        List<InternalRow> rows =
+                                listSerializer.deserializeList(valueBytes, 
valueSerializer);
+                        if (rows == null) {
+                            return EMPTY;
+                        }
+                        return rows;
+                    });
+        } catch (ExecutionException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private byte[] serializeValue(InternalRow value) throws IOException {
+        valueOutputView.clear();
+        valueSerializer.serialize(value, valueOutputView);
+        return valueOutputView.getCopyOfBuffer();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index bfb41e35d..c32703af7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -37,6 +37,8 @@ import java.nio.charset.StandardCharsets;
 /** Factory to create state. */
 public class RocksDBStateFactory implements Closeable {
 
+    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+
     private RocksDB db;
 
     private final ColumnFamilyOptions columnFamilyOptions;
@@ -51,7 +53,8 @@ public class RocksDBStateFactory implements Closeable {
                                 .setCreateIfMissing(true),
                         conf);
         this.columnFamilyOptions =
-                RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf);
+                RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf)
+                        .setMergeOperatorName(MERGE_OPERATOR_NAME);
 
         try {
             this.db = RocksDB.open(new Options(dbOptions, 
columnFamilyOptions), path);
@@ -80,6 +83,17 @@ public class RocksDBStateFactory implements Closeable {
                 db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
     }
 
+    public RocksDBListState listState(
+            String name,
+            Serializer<InternalRow> keySerializer,
+            Serializer<InternalRow> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+
+        return new RocksDBListState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
+    }
+
     private ColumnFamilyHandle createColumnFamily(String name) throws 
IOException {
         try {
             return db.createColumnFamily(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index b67b2fa70..a078858ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -543,4 +543,38 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                         Row.of(4, null, null, null));
         iterator.close();
     }
+
+    @Test
+    public void testLookupNonPkAppendTable() throws Exception {
+        sql(
+                "CREATE TABLE DIM_NO_PK (i INT, j INT, k1 INT, k2 INT) "
+                        + "PARTITIONED BY (`i`) WITH 
('continuous.discovery-interval'='1 ms')");
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_NO_PK for 
system_time as of T.proctime AS D ON T.i "
+                        + "= D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null, null),
+                        Row.of(2, null, null, null),
+                        Row.of(3, null, null, null));
+
+        sql(
+                "INSERT INTO DIM_NO_PK VALUES (1, 11, 111, 1111), (1, 12, 112, 
1112), (1, 11, 111, 1111)");
+        Thread.sleep(2000); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2), (4)");
+        result = iterator.collect(5);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(1, 12, 112, 1112),
+                        Row.of(2, null, null, null),
+                        Row.of(4, null, null, null));
+        iterator.close();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 99b547b5e..9c7366be6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -146,6 +147,33 @@ public class LookupTableTest {
         assertThat(table.get(row(33))).hasSize(0);
     }
 
+    @Test
+    public void testNoPrimaryKeyTable() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        Collections.emptyList(),
+                        singletonList("f1"),
+                        r -> r.getInt(2) < 222,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 333)).iterator());
+        List<InternalRow> result = table.get(row(11));
+        assertThat(result).hasSize(0);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        result = table.get(row(11));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        result = table.get(row(11));
+        assertThat(result).hasSize(2);
+        assertRow(result.get(0), 1, 11, 111);
+        assertRow(result.get(1), 1, 11, 111);
+    }
+
     private static InternalRow row(Object... values) {
         return row(RowKind.INSERT, values);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
new file mode 100644
index 000000000..dfd016650
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Test for {@link org.apache.paimon.flink.lookup.RocksDBListState}. */
+public class RocksDBListStateTest {
+
+    @TempDir Path tempDir;
+
+    @Test
+    void test() throws Exception {
+        RocksDBStateFactory factory = new 
RocksDBStateFactory(tempDir.toString(), new Options());
+
+        RowType keyType = RowType.of(DataTypes.STRING());
+        RowType valueType = RowType.of(DataTypes.STRING());
+        RocksDBListState listState =
+                factory.listState(
+                        "test",
+                        new InternalRowSerializer(keyType),
+                        new InternalRowSerializer(valueType),
+                        1);
+
+        GenericRow key = row("aaa");
+        listState.add(key, row("1"));
+        List<InternalRow> result = listState.get(key);
+        Assertions.assertEquals(Lists.newArrayList("1"), getString(result));
+        listState.add(key, row("2,3"));
+        Assertions.assertEquals(Lists.newArrayList("1", "2,3"), 
getString(listState.get(key)));
+        listState.add(key, row("1"));
+        Assertions.assertEquals(Lists.newArrayList("1", "2,3", "1"), 
getString(listState.get(key)));
+        Assertions.assertEquals(Lists.newArrayList("1", "2,3", "1"), 
getString(listState.get(key)));
+        Assertions.assertTrue(listState.get(row("bbb")).isEmpty());
+    }
+
+    public GenericRow row(String value) {
+        return GenericRow.of(bs(value));
+    }
+
+    public GenericRow row(String value, RowKind kind) {
+        GenericRow row = GenericRow.of(bs(value));
+        row.setRowKind(kind);
+        return row;
+    }
+
+    public BinaryString bs(String v) {
+        return BinaryString.fromString(v);
+    }
+
+    public BinaryRow write(String v) {
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter write = new BinaryRowWriter(row);
+        write.writeString(0, bs(v));
+        return row;
+    }
+
+    public List<String> getString(List<InternalRow> inputs) {
+        List<String> rows = new ArrayList<>();
+        for (InternalRow input : inputs) {
+            rows.add(input.getString(0).toString());
+        }
+        return rows;
+    }
+}

Reply via email to