This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/release-0.9 by this push:
new 5e04dcc47 [client] Implement Lookup with insert-if-not-exists on the
client side (#2573)
5e04dcc47 is described below
commit 5e04dcc47234e9036cbff4225343d7302d1ed866
Author: xx789 <[email protected]>
AuthorDate: Sun Feb 8 15:10:15 2026 +0800
[client] Implement Lookup with insert-if-not-exists on the client side
(#2573)
---
.../org/apache/fluss/client/lookup/Lookup.java | 10 +++
.../apache/fluss/client/lookup/LookupClient.java | 31 ++++++--
.../apache/fluss/client/lookup/LookupQuery.java | 9 ++-
.../apache/fluss/client/lookup/LookupSender.java | 24 +++++-
.../org/apache/fluss/client/lookup/LookupType.java | 1 +
.../fluss/client/lookup/PrimaryKeyLookuper.java | 7 +-
.../apache/fluss/client/lookup/TableLookup.java | 75 +++++++++++++++++-
.../fluss/client/utils/ClientRpcMessageUtils.java | 11 ++-
.../fluss/client/lookup/LookupSenderTest.java | 8 +-
.../fluss/client/table/FlussTableITCase.java | 91 ++++++++++++++++++++++
10 files changed, 248 insertions(+), 19 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
index e64685b9a..50f65815a 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
@@ -94,6 +94,16 @@ public interface Lookup {
return lookupBy(Arrays.asList(lookupColumnNames));
}
+ /**
+ * Enables insert-if-not-exists behavior for the lookup operation. When
enabled, if a lookup
+ * does not find a matching row, a new row will be inserted with the
lookup key values. This
+ * feature cannot be used on tables that contain non-nullable columns
other than the primary key
+ * or auto-increment columns.
+ *
+ * @return a new Lookup instance with insert-if-not-exists enabled
+ */
+ Lookup enableInsertIfNotExists();
+
/**
* Creates a {@link Lookuper} instance to lookup rows of a primary key
table by the specified
* lookup columns. By default, the lookup columns are the primary key
columns, but can be
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index bcea2302c..7e2798294 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -44,9 +44,10 @@ import java.util.concurrent.TimeUnit;
* that is responsible for turning these lookup operations into network
requests and transmitting
* them to the cluster.
*
- * <p>The {@link #lookup(TablePath, TableBucket, byte[])} method is
asynchronous, when called, it
- * adds the lookup operation to a queue of pending lookup operations and
immediately returns. This
- * allows the lookup operations to batch together individual lookup operations
for efficiency.
+ * <p>The {@link #lookup(TablePath, TableBucket, byte[], boolean)} method is
asynchronous, when
+ * called, it adds the lookup operation to a queue of pending lookup
operations and immediately
+ * returns. This allows the lookup operations to batch together individual
lookup operations for
+ * efficiency.
*/
@ThreadSafe
@Internal
@@ -64,15 +65,30 @@ public class LookupClient {
public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) {
this.lookupQueue = new LookupQueue(conf);
this.lookupSenderThreadPool = createThreadPool();
+ short acks = configureAcks(conf);
this.lookupSender =
new LookupSender(
metadataUpdater,
lookupQueue,
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
- conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
+ conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES),
+ acks,
+ (int)
conf.get(ConfigOptions.CLIENT_REQUEST_TIMEOUT).toMillis());
lookupSenderThreadPool.submit(lookupSender);
}
+ private short configureAcks(Configuration conf) {
+ String acks = conf.get(ConfigOptions.CLIENT_WRITER_ACKS);
+ short ack;
+ if (acks.equals("all")) {
+ ack = -1;
+ } else {
+ ack = Short.parseShort(acks);
+ }
+
+ return ack;
+ }
+
private ExecutorService createThreadPool() {
// according to benchmark, increase the thread pool size improve not
so much
// performance, so we always use 1 thread for simplicity.
@@ -80,8 +96,11 @@ public class LookupClient {
}
public CompletableFuture<byte[]> lookup(
- TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
- LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
+ TablePath tablePath,
+ TableBucket tableBucket,
+ byte[] keyBytes,
+ boolean insertIfNotExists) {
+ LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes,
insertIfNotExists);
lookupQueue.appendLookup(lookup);
return lookup.future();
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
index cc6e70e34..07b4d0ea4 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
@@ -31,15 +31,22 @@ import java.util.concurrent.CompletableFuture;
public class LookupQuery extends AbstractLookupQuery<byte[]> {
private final CompletableFuture<byte[]> future;
+ private final boolean insertIfNotExists;
LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
+ this(tablePath, tableBucket, key, false);
+ }
+
+ LookupQuery(
+ TablePath tablePath, TableBucket tableBucket, byte[] key, boolean
insertIfNotExists) {
super(tablePath, tableBucket, key);
this.future = new CompletableFuture<>();
+ this.insertIfNotExists = insertIfNotExists;
}
@Override
public LookupType lookupType() {
- return LookupType.LOOKUP;
+ return insertIfNotExists ? LookupType.LOOKUP_WITH_INSERT_IF_NOT_EXISTS
: LookupType.LOOKUP;
}
@Override
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index fba5ced53..e0d83784b 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -77,16 +77,24 @@ class LookupSender implements Runnable {
private final int maxRetries;
+ private final int maxRequestTimeoutMs;
+
+ private final short acks;
+
LookupSender(
MetadataUpdater metadataUpdater,
LookupQueue lookupQueue,
int maxFlightRequests,
- int maxRetries) {
+ int maxRetries,
+ short acks,
+ int maxRequestTimeoutMs) {
this.metadataUpdater = metadataUpdater;
this.lookupQueue = lookupQueue;
this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
this.maxRetries = maxRetries;
this.running = true;
+ this.acks = acks;
+ this.maxRequestTimeoutMs = maxRequestTimeoutMs;
}
@Override
@@ -178,7 +186,9 @@ class LookupSender implements Runnable {
void sendLookups(
int destination, LookupType lookupType,
List<AbstractLookupQuery<?>> lookupBatches) {
if (lookupType == LookupType.LOOKUP) {
- sendLookupRequest(destination, lookupBatches);
+ sendLookupRequest(destination, lookupBatches, false);
+ } else if (lookupType == LookupType.LOOKUP_WITH_INSERT_IF_NOT_EXISTS) {
+ sendLookupRequest(destination, lookupBatches, true);
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
sendPrefixLookupRequest(destination, lookupBatches);
} else {
@@ -186,7 +196,8 @@ class LookupSender implements Runnable {
}
}
- private void sendLookupRequest(int destination,
List<AbstractLookupQuery<?>> lookups) {
+ private void sendLookupRequest(
+ int destination, List<AbstractLookupQuery<?>> lookups, boolean
insertIfNotExists) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new
HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -218,7 +229,12 @@ class LookupSender implements Runnable {
sendLookupRequestAndHandleResponse(
destination,
gateway,
- makeLookupRequest(tableId,
lookupsByBucket.values()),
+ makeLookupRequest(
+ tableId,
+ lookupsByBucket.values(),
+ insertIfNotExists,
+ acks,
+ maxRequestTimeoutMs),
tableId,
lookupsByBucket));
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
index d4bb8c6ee..082fed961 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java
@@ -23,5 +23,6 @@ import org.apache.fluss.annotation.Internal;
@Internal
public enum LookupType {
LOOKUP,
+ LOOKUP_WITH_INSERT_IF_NOT_EXISTS,
PREFIX_LOOKUP;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 7d504baf8..26a03c47c 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -52,6 +52,7 @@ class PrimaryKeyLookuper extends AbstractLookuper implements
Lookuper {
private final BucketingFunction bucketingFunction;
private final int numBuckets;
+ private final boolean insertIfNotExists;
/** a getter to extract partition from lookup key row, null when it's not
a partitioned. */
private @Nullable final PartitionGetter partitionGetter;
@@ -60,13 +61,15 @@ class PrimaryKeyLookuper extends AbstractLookuper
implements Lookuper {
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
- LookupClient lookupClient) {
+ LookupClient lookupClient,
+ boolean insertIfNotExists) {
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
checkArgument(
tableInfo.hasPrimaryKey(),
"Log table %s doesn't support lookup",
tableInfo.getTablePath());
this.numBuckets = tableInfo.getNumBuckets();
+ this.insertIfNotExists = insertIfNotExists;
// the row type of the input lookup row
RowType lookupRowType =
tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
@@ -118,7 +121,7 @@ class PrimaryKeyLookuper extends AbstractLookuper
implements Lookuper {
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(),
partitionId, bucketId);
CompletableFuture<LookupResult> lookupFuture = new
CompletableFuture<>();
lookupClient
- .lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
+ .lookup(tableInfo.getTablePath(), tableBucket, pkBytes,
insertIfNotExists)
.whenComplete(
(result, error) -> {
if (error != null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
index e0c92661f..aa9fee749 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
@@ -18,12 +18,15 @@
package org.apache.fluss.client.lookup;
import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableInfo;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.List;
+import java.util.stream.Collectors;
/** API for configuring and creating {@link Lookuper}. */
public class TableLookup implements Lookup {
@@ -35,12 +38,14 @@ public class TableLookup implements Lookup {
@Nullable private final List<String> lookupColumnNames;
+ private final boolean insertIfNotExists;
+
public TableLookup(
TableInfo tableInfo,
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient) {
- this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null,
false);
}
private TableLookup(
@@ -48,30 +53,92 @@ public class TableLookup implements Lookup {
SchemaGetter schemaGetter,
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
- @Nullable List<String> lookupColumnNames) {
+ @Nullable List<String> lookupColumnNames,
+ boolean insertIfNotExists) {
this.tableInfo = tableInfo;
this.schemaGetter = schemaGetter;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.lookupColumnNames = lookupColumnNames;
+ this.insertIfNotExists = insertIfNotExists;
+ }
+
+ @Override
+ public Lookup enableInsertIfNotExists() {
+ return new TableLookup(
+ tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames, true);
}
@Override
public Lookup lookupBy(List<String> lookupColumnNames) {
return new TableLookup(
- tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames);
+ tableInfo,
+ schemaGetter,
+ metadataUpdater,
+ lookupClient,
+ lookupColumnNames,
+ insertIfNotExists);
}
@Override
public Lookuper createLookuper() {
if (lookupColumnNames == null) {
- return new PrimaryKeyLookuper(tableInfo, schemaGetter,
metadataUpdater, lookupClient);
+ if (insertIfNotExists) {
+ // get all non-nullable columns that are not primary key
columns and auto increment
+ // columns, if there is any, throw exception, as we cannot
fill values for those
+ // columns when doing insert if not exists.
+ List<String> notNullColumnNames =
+ tableInfo.getSchema().getColumns().stream()
+ .filter(column ->
!column.getDataType().isNullable())
+ .map(Schema.Column::getName)
+ .filter(name ->
!tableInfo.getPrimaryKeys().contains(name))
+ .filter(
+ name ->
+ !tableInfo
+ .getSchema()
+
.getAutoIncrementColumnNames()
+ .contains(name))
+ .collect(Collectors.toList());
+ if (!notNullColumnNames.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Lookup with insertIfNotExists enabled cannot be
created for table '"
+ + tableInfo.getTablePath()
+ + "', because it contains non-nullable
columns that are not primary key columns or auto increment columns: "
+ + notNullColumnNames
+ + ". ");
+ }
+ }
+ return new PrimaryKeyLookuper(
+ tableInfo, schemaGetter, metadataUpdater, lookupClient,
insertIfNotExists);
} else {
+ // throw exception if the insertIfNotExists is enabled for prefix
key lookup, as
+ // currently we only support insertIfNotExists for primary key
lookup.
+ if (insertIfNotExists) {
+ throw new IllegalArgumentException(
+ "insertIfNotExists cannot be enabled for prefix key
lookup, as currently we only support insertIfNotExists for primary key
lookup.");
+ }
+ if (lookupKeysArePrimaryKeys()) {
+ throw new IllegalArgumentException(
+ "Can not perform prefix lookup on table '"
+ + tableInfo.getTablePath()
+ + "', because the lookup columns "
+ + lookupColumnNames
+ + " equals the physical primary keys "
+ + tableInfo.getPrimaryKeys()
+ + ". Please use primary key lookup (Lookuper
without lookupBy) instead.");
+ }
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames);
}
}
+ private boolean lookupKeysArePrimaryKeys() {
+ List<String> primaryKeys = tableInfo.getPrimaryKeys();
+ return lookupColumnNames != null
+ && lookupColumnNames.size() == primaryKeys.size()
+ && new HashSet<>(lookupColumnNames).containsAll(primaryKeys);
+ }
+
@Override
public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
return new TypedLookuperImpl<>(createLookuper(), tableInfo,
lookupColumnNames, pojoClass);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index 75cafb491..1748e0e43 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -186,8 +186,17 @@ public class ClientRpcMessageUtils {
}
public static LookupRequest makeLookupRequest(
- long tableId, Collection<LookupBatch> lookupBatches) {
+ long tableId,
+ Collection<LookupBatch> lookupBatches,
+ boolean insertIfNotExists,
+ short acks,
+ int timeoutMs) {
LookupRequest request = new LookupRequest().setTableId(tableId);
+ if (insertIfNotExists) {
+ request.setInsertIfNotExists(true);
+ request.setAcks(acks);
+ request.setTimeoutMs(timeoutMs);
+ }
lookupBatches.forEach(
(batch) -> {
TableBucket tb = batch.tableBucket();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index 557a049a4..e5fb1b2b4 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -95,7 +95,13 @@ public class LookupSenderTest {
lookupQueue = new LookupQueue(conf);
lookupSender =
- new LookupSender(metadataUpdater, lookupQueue,
MAX_INFLIGHT_REQUESTS, MAX_RETRIES);
+ new LookupSender(
+ metadataUpdater,
+ lookupQueue,
+ MAX_INFLIGHT_REQUESTS,
+ MAX_RETRIES,
+ (short) -1,
+ 1000);
senderThread = new Thread(lookupSender);
senderThread.start();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index c6bf09520..1fbd78ff7 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -476,6 +476,12 @@ class FlussTableITCase extends ClientToServerITCaseBase {
.hasMessageContaining(
"Can not perform prefix lookup on table
'test_db_1.test_invalid_prefix_lookup_2', "
+ "because the lookup columns [b, a] must
contain all bucket keys [a, b] in order.");
+
+ assertThatThrownBy(() -> table2.newLookup().lookupBy("a", "b",
"c").createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Can not perform prefix lookup on table
'test_db_1.test_invalid_prefix_lookup_2', "
+ + "because the lookup columns [a, b, c] equals
the physical primary keys [a, b, c]. Please use primary key lookup (Lookuper
without lookupBy) instead.");
}
@Test
@@ -633,6 +639,91 @@ class FlussTableITCase extends ClientToServerITCaseBase {
verifyRecords(expectedRecords, autoIncTable, schema);
}
+ @Test
+ void testLookupWithInsertIfNotExists() throws Exception {
+ TablePath tablePath = TablePath.of("test_db_1",
"test_invalid_insert_lookup_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .column("d", new StringType(false))
+ .primaryKey("b", "c")
+ .enableAutoIncrement("a")
+ .build();
+ TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).build();
+ createTable(tablePath, tableDescriptor, false);
+ Table invalidTable = conn.getTable(tablePath);
+
+ assertThatThrownBy(
+ () ->
invalidTable.newLookup().enableInsertIfNotExists().createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Lookup with insertIfNotExists enabled cannot be
created for table 'test_db_1.test_invalid_insert_lookup_table', "
+ + "because it contains non-nullable columns
that are not primary key columns or auto increment columns: [d].");
+
+ tablePath = TablePath.of("test_db_1", "test_insert_lookup_table");
+ schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .column("d", new StringType(true))
+ .primaryKey("b", "c")
+ .enableAutoIncrement("a")
+ .build();
+ tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1, "b").build();
+ createTable(tablePath, tableDescriptor, true);
+ Table table = conn.getTable(tablePath);
+
+ // verify invalid prefix lookup with insert if not exists enabled.
+ assertThatThrownBy(
+ () ->
+ table.newLookup()
+ .lookupBy("b")
+ .enableInsertIfNotExists()
+ .createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "insertIfNotExists cannot be enabled for prefix key
lookup, as currently we only support insertIfNotExists for primary key lookup");
+
+ // swap the order of lookupBy and enableInsertIfNotExists,
+ // and verify the exception is still thrown.
+ assertThatThrownBy(
+ () ->
+ table.newLookup()
+ .enableInsertIfNotExists()
+ .lookupBy("b")
+ .createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "insertIfNotExists cannot be enabled for prefix key
lookup, as currently we only support insertIfNotExists for primary key lookup");
+
+ RowType rowType = schema.getRowType();
+
+ Lookuper lookuper = table.newLookup().createLookuper();
+ // make sure the key does not exist
+ assertThat(lookupRow(lookuper, row(100, 100))).isNull();
+
+ Lookuper insertLookuper =
table.newLookup().enableInsertIfNotExists().createLookuper();
+ assertRowValueEquals(
+ rowType,
+ insertLookuper.lookup(row(100, 100)).get().getSingletonRow(),
+ new Object[] {1, 100, 100, null});
+
+ // lookup the same key again
+ assertRowValueEquals(
+ rowType,
+ insertLookuper.lookup(row(100, 100)).get().getSingletonRow(),
+ new Object[] {1, 100, 100, null});
+
+ // test another key
+ assertRowValueEquals(
+ rowType,
+ insertLookuper.lookup(row(200, 200)).get().getSingletonRow(),
+ new Object[] {2, 200, 200, null});
+ }
+
private void partialUpdateRecords(String[] targetColumns, Object[][]
records, Table table) {
UpsertWriter upsertWriter =
table.newUpsert().partialUpdate(targetColumns).createWriter();
for (Object[] record : records) {