This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/release-0.9 by this push:
     new 5e04dcc47 [client] Implement Lookup with insert-if-not-exists on the 
client side (#2573)
5e04dcc47 is described below

commit 5e04dcc47234e9036cbff4225343d7302d1ed866
Author: xx789 <[email protected]>
AuthorDate: Sun Feb 8 15:10:15 2026 +0800

    [client] Implement Lookup with insert-if-not-exists on the client side 
(#2573)
---
 .../org/apache/fluss/client/lookup/Lookup.java     | 10 +++
 .../apache/fluss/client/lookup/LookupClient.java   | 31 ++++++--
 .../apache/fluss/client/lookup/LookupQuery.java    |  9 ++-
 .../apache/fluss/client/lookup/LookupSender.java   | 24 +++++-
 .../org/apache/fluss/client/lookup/LookupType.java |  1 +
 .../fluss/client/lookup/PrimaryKeyLookuper.java    |  7 +-
 .../apache/fluss/client/lookup/TableLookup.java    | 75 +++++++++++++++++-
 .../fluss/client/utils/ClientRpcMessageUtils.java  | 11 ++-
 .../fluss/client/lookup/LookupSenderTest.java      |  8 +-
 .../fluss/client/table/FlussTableITCase.java       | 91 ++++++++++++++++++++++
 10 files changed, 248 insertions(+), 19 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
index e64685b9a..50f65815a 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
@@ -94,6 +94,16 @@ public interface Lookup {
         return lookupBy(Arrays.asList(lookupColumnNames));
     }
 
+    /**
+     * Enables insert-if-not-exists behavior for the lookup operation. When 
enabled, if a lookup
+     * does not find a matching row, a new row will be inserted with the 
lookup key values. This
+     * feature cannot be used on tables that contain non-nullable columns 
other than the primary key
+     * or auto-increment columns.
+     *
+     * @return a new Lookup instance with insert-if-not-exists enabled
+     */
+    Lookup enableInsertIfNotExists();
+
     /**
      * Creates a {@link Lookuper} instance to lookup rows of a primary key 
table by the specified
      * lookup columns. By default, the lookup columns are the primary key 
columns, but can be
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index bcea2302c..7e2798294 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -44,9 +44,10 @@ import java.util.concurrent.TimeUnit;
  * that is responsible for turning these lookup operations into network 
requests and transmitting
  * them to the cluster.
  *
- * <p>The {@link #lookup(TablePath, TableBucket, byte[])} method is 
asynchronous, when called, it
- * adds the lookup operation to a queue of pending lookup operations and 
immediately returns. This
- * allows the lookup operations to batch together individual lookup operations 
for efficiency.
+ * <p>The {@link #lookup(TablePath, TableBucket, byte[], boolean)} method is 
asynchronous, when
+ * called, it adds the lookup operation to a queue of pending lookup 
operations and immediately
+ * returns. This allows the lookup operations to batch together individual 
lookup operations for
+ * efficiency.
  */
 @ThreadSafe
 @Internal
@@ -64,15 +65,30 @@ public class LookupClient {
     public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) {
         this.lookupQueue = new LookupQueue(conf);
         this.lookupSenderThreadPool = createThreadPool();
+        short acks = configureAcks(conf);
         this.lookupSender =
                 new LookupSender(
                         metadataUpdater,
                         lookupQueue,
                         
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
-                        conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
+                        conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES),
+                        acks,
+                        (int) 
conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis());
         lookupSenderThreadPool.submit(lookupSender);
     }
 
+    private short configureAcks(Configuration conf) {
+        String acks = conf.get(ConfigOptions.CLIENT_WRITER_ACKS);
+        short ack;
+        if (acks.equals("all")) {
+            ack = -1;
+        } else {
+            ack = Short.parseShort(acks);
+        }
+
+        return ack;
+    }
+
     private ExecutorService createThreadPool() {
         // according to benchmark, increase the thread pool size improve not 
so much
         // performance, so we always use 1 thread for simplicity.
@@ -80,8 +96,11 @@ public class LookupClient {
     }
 
     public CompletableFuture<byte[]> lookup(
-            TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
-        LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
+            TablePath tablePath,
+            TableBucket tableBucket,
+            byte[] keyBytes,
+            boolean insertIfNotExists) {
+        LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes, 
insertIfNotExists);
         lookupQueue.appendLookup(lookup);
         return lookup.future();
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
index cc6e70e34..07b4d0ea4 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
@@ -31,15 +31,22 @@ import java.util.concurrent.CompletableFuture;
 public class LookupQuery extends AbstractLookupQuery<byte[]> {
 
     private final CompletableFuture<byte[]> future;
+    private final boolean insertIfNotExists;
 
     LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
+        this(tablePath, tableBucket, key, false);
+    }
+
+    LookupQuery(
+            TablePath tablePath, TableBucket tableBucket, byte[] key, boolean 
insertIfNotExists) {
         super(tablePath, tableBucket, key);
         this.future = new CompletableFuture<>();
+        this.insertIfNotExists = insertIfNotExists;
     }
 
     @Override
     public LookupType lookupType() {
-        return LookupType.LOOKUP;
+        return insertIfNotExists ? LookupType.LOOKUP_WITH_INSERT_IF_NOT_EXISTS 
: LookupType.LOOKUP;
     }
 
     @Override
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index fba5ced53..e0d83784b 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -77,16 +77,24 @@ class LookupSender implements Runnable {
 
     private final int maxRetries;
 
+    private final int maxRequestTimeoutMs;
+
+    private final short acks;
+
     LookupSender(
             MetadataUpdater metadataUpdater,
             LookupQueue lookupQueue,
             int maxFlightRequests,
-            int maxRetries) {
+            int maxRetries,
+            short acks,
+            int maxRequestTimeoutMs) {
         this.metadataUpdater = metadataUpdater;
         this.lookupQueue = lookupQueue;
         this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
         this.maxRetries = maxRetries;
         this.running = true;
+        this.acks = acks;
+        this.maxRequestTimeoutMs = maxRequestTimeoutMs;
     }
 
     @Override
@@ -178,7 +186,9 @@ class LookupSender implements Runnable {
     void sendLookups(
             int destination, LookupType lookupType, 
List<AbstractLookupQuery<?>> lookupBatches) {
         if (lookupType == LookupType.LOOKUP) {
-            sendLookupRequest(destination, lookupBatches);
+            sendLookupRequest(destination, lookupBatches, false);
+        } else if (lookupType == LookupType.LOOKUP_WITH_INSERT_IF_NOT_EXISTS) {
+            sendLookupRequest(destination, lookupBatches, true);
         } else if (lookupType == LookupType.PREFIX_LOOKUP) {
             sendPrefixLookupRequest(destination, lookupBatches);
         } else {
@@ -186,7 +196,8 @@ class LookupSender implements Runnable {
         }
     }
 
-    private void sendLookupRequest(int destination, 
List<AbstractLookupQuery<?>> lookups) {
+    private void sendLookupRequest(
+            int destination, List<AbstractLookupQuery<?>> lookups, boolean 
insertIfNotExists) {
         // table id -> (bucket -> lookups)
         Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new 
HashMap<>();
         for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -218,7 +229,12 @@ class LookupSender implements Runnable {
                         sendLookupRequestAndHandleResponse(
                                 destination,
                                 gateway,
-                                makeLookupRequest(tableId, 
lookupsByBucket.values()),
+                                makeLookupRequest(
+                                        tableId,
+                                        lookupsByBucket.values(),
+                                        insertIfNotExists,
+                                        acks,
+                                        maxRequestTimeoutMs),
                                 tableId,
                                 lookupsByBucket));
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
index d4bb8c6ee..082fed961 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
@@ -23,5 +23,6 @@ import org.apache.fluss.annotation.Internal;
 @Internal
 public enum LookupType {
     LOOKUP,
+    LOOKUP_WITH_INSERT_IF_NOT_EXISTS,
     PREFIX_LOOKUP;
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 7d504baf8..26a03c47c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -52,6 +52,7 @@ class PrimaryKeyLookuper extends AbstractLookuper implements 
Lookuper {
 
     private final BucketingFunction bucketingFunction;
     private final int numBuckets;
+    private final boolean insertIfNotExists;
 
     /** a getter to extract partition from lookup key row, null when it's not 
a partitioned. */
     private @Nullable final PartitionGetter partitionGetter;
@@ -60,13 +61,15 @@ class PrimaryKeyLookuper extends AbstractLookuper 
implements Lookuper {
             TableInfo tableInfo,
             SchemaGetter schemaGetter,
             MetadataUpdater metadataUpdater,
-            LookupClient lookupClient) {
+            LookupClient lookupClient,
+            boolean insertIfNotExists) {
         super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
         checkArgument(
                 tableInfo.hasPrimaryKey(),
                 "Log table %s doesn't support lookup",
                 tableInfo.getTablePath());
         this.numBuckets = tableInfo.getNumBuckets();
+        this.insertIfNotExists = insertIfNotExists;
 
         // the row type of the input lookup row
         RowType lookupRowType = 
tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
@@ -118,7 +121,7 @@ class PrimaryKeyLookuper extends AbstractLookuper 
implements Lookuper {
         TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), 
partitionId, bucketId);
         CompletableFuture<LookupResult> lookupFuture = new 
CompletableFuture<>();
         lookupClient
-                .lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
+                .lookup(tableInfo.getTablePath(), tableBucket, pkBytes, 
insertIfNotExists)
                 .whenComplete(
                         (result, error) -> {
                             if (error != null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
index e0c92661f..aa9fee749 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
@@ -18,12 +18,15 @@
 package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.TableInfo;
 
 import javax.annotation.Nullable;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** API for configuring and creating {@link Lookuper}. */
 public class TableLookup implements Lookup {
@@ -35,12 +38,14 @@ public class TableLookup implements Lookup {
 
     @Nullable private final List<String> lookupColumnNames;
 
+    private final boolean insertIfNotExists;
+
     public TableLookup(
             TableInfo tableInfo,
             SchemaGetter schemaGetter,
             MetadataUpdater metadataUpdater,
             LookupClient lookupClient) {
-        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
+        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, 
false);
     }
 
     private TableLookup(
@@ -48,30 +53,92 @@ public class TableLookup implements Lookup {
             SchemaGetter schemaGetter,
             MetadataUpdater metadataUpdater,
             LookupClient lookupClient,
-            @Nullable List<String> lookupColumnNames) {
+            @Nullable List<String> lookupColumnNames,
+            boolean insertIfNotExists) {
         this.tableInfo = tableInfo;
         this.schemaGetter = schemaGetter;
         this.metadataUpdater = metadataUpdater;
         this.lookupClient = lookupClient;
         this.lookupColumnNames = lookupColumnNames;
+        this.insertIfNotExists = insertIfNotExists;
+    }
+
+    @Override
+    public Lookup enableInsertIfNotExists() {
+        return new TableLookup(
+                tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames, true);
     }
 
     @Override
     public Lookup lookupBy(List<String> lookupColumnNames) {
         return new TableLookup(
-                tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames);
+                tableInfo,
+                schemaGetter,
+                metadataUpdater,
+                lookupClient,
+                lookupColumnNames,
+                insertIfNotExists);
     }
 
     @Override
     public Lookuper createLookuper() {
         if (lookupColumnNames == null) {
-            return new PrimaryKeyLookuper(tableInfo, schemaGetter, 
metadataUpdater, lookupClient);
+            if (insertIfNotExists) {
+                // get all non-nullable columns that are not primary key 
columns and auto increment
+                // columns, if there is any, throw exception, as we cannot 
fill values for those
+                // columns when doing insert if not exists.
+                List<String> notNullColumnNames =
+                        tableInfo.getSchema().getColumns().stream()
+                                .filter(column -> 
!column.getDataType().isNullable())
+                                .map(Schema.Column::getName)
+                                .filter(name -> 
!tableInfo.getPrimaryKeys().contains(name))
+                                .filter(
+                                        name ->
+                                                !tableInfo
+                                                        .getSchema()
+                                                        
.getAutoIncrementColumnNames()
+                                                        .contains(name))
+                                .collect(Collectors.toList());
+                if (!notNullColumnNames.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Lookup with insertIfNotExists enabled cannot be 
created for table '"
+                                    + tableInfo.getTablePath()
+                                    + "', because it contains non-nullable 
columns that are not primary key columns or auto increment columns: "
+                                    + notNullColumnNames
+                                    + ". ");
+                }
+            }
+            return new PrimaryKeyLookuper(
+                    tableInfo, schemaGetter, metadataUpdater, lookupClient, 
insertIfNotExists);
         } else {
+            // throw exception if the insertIfNotExists is enabled for prefix 
key lookup, as
+            // currently we only support insertIfNotExists for primary key 
lookup.
+            if (insertIfNotExists) {
+                throw new IllegalArgumentException(
+                        "insertIfNotExists cannot be enabled for prefix key 
lookup, as currently we only support insertIfNotExists for primary key 
lookup.");
+            }
+            if (lookupKeysArePrimaryKeys()) {
+                throw new IllegalArgumentException(
+                        "Can not perform prefix lookup on table '"
+                                + tableInfo.getTablePath()
+                                + "', because the lookup columns "
+                                + lookupColumnNames
+                                + " equals the physical primary keys "
+                                + tableInfo.getPrimaryKeys()
+                                + ". Please use primary key lookup (Lookuper 
without lookupBy) instead.");
+            }
             return new PrefixKeyLookuper(
                     tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames);
         }
     }
 
+    private boolean lookupKeysArePrimaryKeys() {
+        List<String> primaryKeys = tableInfo.getPrimaryKeys();
+        return lookupColumnNames != null
+                && lookupColumnNames.size() == primaryKeys.size()
+                && new HashSet<>(lookupColumnNames).containsAll(primaryKeys);
+    }
+
     @Override
     public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
         return new TypedLookuperImpl<>(createLookuper(), tableInfo, 
lookupColumnNames, pojoClass);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index 75cafb491..1748e0e43 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -186,8 +186,17 @@ public class ClientRpcMessageUtils {
     }
 
     public static LookupRequest makeLookupRequest(
-            long tableId, Collection<LookupBatch> lookupBatches) {
+            long tableId,
+            Collection<LookupBatch> lookupBatches,
+            boolean insertIfNotExists,
+            short acks,
+            int timeoutMs) {
         LookupRequest request = new LookupRequest().setTableId(tableId);
+        if (insertIfNotExists) {
+            request.setInsertIfNotExists(true);
+            request.setAcks(acks);
+            request.setTimeoutMs(timeoutMs);
+        }
         lookupBatches.forEach(
                 (batch) -> {
                     TableBucket tb = batch.tableBucket();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index 557a049a4..e5fb1b2b4 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -95,7 +95,13 @@ public class LookupSenderTest {
         lookupQueue = new LookupQueue(conf);
 
         lookupSender =
-                new LookupSender(metadataUpdater, lookupQueue, 
MAX_INFLIGHT_REQUESTS, MAX_RETRIES);
+                new LookupSender(
+                        metadataUpdater,
+                        lookupQueue,
+                        MAX_INFLIGHT_REQUESTS,
+                        MAX_RETRIES,
+                        (short) -1,
+                        1000);
 
         senderThread = new Thread(lookupSender);
         senderThread.start();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index c6bf09520..1fbd78ff7 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -476,6 +476,12 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                 .hasMessageContaining(
                         "Can not perform prefix lookup on table 
'test_db_1.test_invalid_prefix_lookup_2', "
                                 + "because the lookup columns [b, a] must 
contain all bucket keys [a, b] in order.");
+
+        assertThatThrownBy(() -> table2.newLookup().lookupBy("a", "b", 
"c").createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Can not perform prefix lookup on table 
'test_db_1.test_invalid_prefix_lookup_2', "
+                                + "because the lookup columns [a, b, c] equals 
the physical primary keys [a, b, c]. Please use primary key lookup (Lookuper 
without lookupBy) instead.");
     }
 
     @Test
@@ -633,6 +639,91 @@ class FlussTableITCase extends ClientToServerITCaseBase {
         verifyRecords(expectedRecords, autoIncTable, schema);
     }
 
+    @Test
+    void testLookupWithInsertIfNotExists() throws Exception {
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_invalid_insert_lookup_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .column("d", new StringType(false))
+                        .primaryKey("b", "c")
+                        .enableAutoIncrement("a")
+                        .build();
+        TableDescriptor tableDescriptor = 
TableDescriptor.builder().schema(schema).build();
+        createTable(tablePath, tableDescriptor, false);
+        Table invalidTable = conn.getTable(tablePath);
+
+        assertThatThrownBy(
+                        () -> 
invalidTable.newLookup().enableInsertIfNotExists().createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Lookup with insertIfNotExists enabled cannot be 
created for table 'test_db_1.test_invalid_insert_lookup_table', "
+                                + "because it contains non-nullable columns 
that are not primary key columns or auto increment columns: [d].");
+
+        tablePath = TablePath.of("test_db_1", "test_insert_lookup_table");
+        schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .column("d", new StringType(true))
+                        .primaryKey("b", "c")
+                        .enableAutoIncrement("a")
+                        .build();
+        tableDescriptor = 
TableDescriptor.builder().schema(schema).distributedBy(1, "b").build();
+        createTable(tablePath, tableDescriptor, true);
+        Table table = conn.getTable(tablePath);
+
+        // verify invalid prefix lookup with insert if not exists enabled.
+        assertThatThrownBy(
+                        () ->
+                                table.newLookup()
+                                        .lookupBy("b")
+                                        .enableInsertIfNotExists()
+                                        .createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "insertIfNotExists cannot be enabled for prefix key 
lookup, as currently we only support insertIfNotExists for primary key lookup");
+
+        // swap the order of lookupBy and enableInsertIfNotExists,
+        // and verify the exception is still thrown.
+        assertThatThrownBy(
+                        () ->
+                                table.newLookup()
+                                        .enableInsertIfNotExists()
+                                        .lookupBy("b")
+                                        .createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "insertIfNotExists cannot be enabled for prefix key 
lookup, as currently we only support insertIfNotExists for primary key lookup");
+
+        RowType rowType = schema.getRowType();
+
+        Lookuper lookuper = table.newLookup().createLookuper();
+        // make sure the key does not exist
+        assertThat(lookupRow(lookuper, row(100, 100))).isNull();
+
+        Lookuper insertLookuper = 
table.newLookup().enableInsertIfNotExists().createLookuper();
+        assertRowValueEquals(
+                rowType,
+                insertLookuper.lookup(row(100, 100)).get().getSingletonRow(),
+                new Object[] {1, 100, 100, null});
+
+        // lookup the same key again
+        assertRowValueEquals(
+                rowType,
+                insertLookuper.lookup(row(100, 100)).get().getSingletonRow(),
+                new Object[] {1, 100, 100, null});
+
+        // test another key
+        assertRowValueEquals(
+                rowType,
+                insertLookuper.lookup(row(200, 200)).get().getSingletonRow(),
+                new Object[] {2, 200, 200, null});
+    }
+
     private void partialUpdateRecords(String[] targetColumns, Object[][] 
records, Table table) {
         UpsertWriter upsertWriter = 
table.newUpsert().partialUpdate(targetColumns).createWriter();
         for (Object[] record : records) {

Reply via email to