platinumhamburg commented on code in PR #2573:
URL: https://github.com/apache/fluss/pull/2573#discussion_r2767618981


##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -633,6 +633,85 @@ void testPutAutoIncrementColumnAndLookup() throws 
Exception {
         verifyRecords(expectedRecords, autoIncTable, schema);
     }
 
+    @Test
+    void testLookupWithInsertIfNotExists() throws Exception {
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_invalid_insert_lookup_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .withComment("a is first column")
+                        .column("b", DataTypes.INT())
+                        .withComment("b is second column")
+                        .primaryKey("b")
+                        .column("c", new StringType(false))
+                        .enableAutoIncrement("a")
+                        .build();
+        TableDescriptor tableDescriptor = 
TableDescriptor.builder().schema(schema).build();
+        createTable(tablePath, tableDescriptor, false);
+        Table invalidTable = conn.getTable(tablePath);
+
+        assertThatThrownBy(
+                        () -> 
invalidTable.newLookup().enableInsertIfNotExists().createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "insertIfNotExists cannot be enabled for tables with 
non-nullable columns besides primary key and auto increment columns.");
+
+        tablePath = TablePath.of("test_db_1", "test_insert_lookup_table");
+        schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .withComment("a is first column")
+                        .column("b", DataTypes.INT())
+                        .withComment("b is second column")
+                        .column("c", new StringType(true))
+                        .primaryKey("b")
+                        .enableAutoIncrement("a")
+                        .build();
+        tableDescriptor = 
TableDescriptor.builder().schema(schema).distributedBy(1, "b").build();
+        createTable(tablePath, tableDescriptor, true);
+        Table table = conn.getTable(tablePath);
+        RowType rowType = schema.getRowType();
+
+        assertThatThrownBy(
+                        () ->
+                                table.newLookup()
+                                        .lookupBy("b")
+                                        .enableInsertIfNotExists()
+                                        .createLookuper())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("insertIfNotExists can not be used with 
prefix lookup");
+        assertThatThrownBy(

Review Comment:
   This is a duplicate assertion. Moreover, the test schema contains only one 
primary key column, 'b', so creating a lookuper with lookupBy("b") should not 
result in a PrefixKeyLookuper. This issue was not introduced by this PR; 
rather, the test has exposed an incompleteness in the semantic implementation 
of TableLookup.



##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -49,23 +51,71 @@ private TableLookup(
             MetadataUpdater metadataUpdater,
             LookupClient lookupClient,
             @Nullable List<String> lookupColumnNames) {
+        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames, false);
+    }
+
+    private TableLookup(
+            TableInfo tableInfo,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater,
+            LookupClient lookupClient,
+            boolean insertIfNotExists) {
+        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, 
insertIfNotExists);
+    }
+
+    private TableLookup(
+            TableInfo tableInfo,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater,
+            LookupClient lookupClient,
+            @Nullable List<String> lookupColumnNames,
+            boolean insertIfNotExists) {
         this.tableInfo = tableInfo;
         this.schemaGetter = schemaGetter;
         this.metadataUpdater = metadataUpdater;
         this.lookupClient = lookupClient;
         this.lookupColumnNames = lookupColumnNames;
+        this.insertIfNotExists = insertIfNotExists;
+    }
+
+    @Override
+    public Lookup enableInsertIfNotExists() {
+        if (lookupColumnNames != null) {
+            throw new IllegalArgumentException(
+                    "insertIfNotExists can not be used with prefix lookup");
+        }
+
+        if (tableInfo.getSchema().getColumns().stream()
+                .filter(column -> !column.getDataType().isNullable())
+                .filter(column -> 
!tableInfo.getPrimaryKeys().contains(column.getName()))
+                .anyMatch(
+                        column ->
+                                !tableInfo
+                                        .getSchema()
+                                        .getAutoIncrementColumnNames()
+                                        .contains(column.getName()))) {
+            throw new IllegalArgumentException(
+                    "insertIfNotExists cannot be enabled for tables with 
non-nullable columns besides primary key and auto increment columns.");
+        }
+
+        return new TableLookup(tableInfo, schemaGetter, metadataUpdater, 
lookupClient, true);
     }
 
     @Override
     public Lookup lookupBy(List<String> lookupColumnNames) {
+        if (insertIfNotExists) {

Review Comment:
   Ditto, this check can be uniformly moved to createLookuper().
   
   



##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -49,23 +51,71 @@ private TableLookup(
             MetadataUpdater metadataUpdater,
             LookupClient lookupClient,
             @Nullable List<String> lookupColumnNames) {
+        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames, false);
+    }
+
+    private TableLookup(
+            TableInfo tableInfo,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater,
+            LookupClient lookupClient,
+            boolean insertIfNotExists) {
+        this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null, 
insertIfNotExists);
+    }
+
+    private TableLookup(
+            TableInfo tableInfo,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater,
+            LookupClient lookupClient,
+            @Nullable List<String> lookupColumnNames,
+            boolean insertIfNotExists) {
         this.tableInfo = tableInfo;
         this.schemaGetter = schemaGetter;
         this.metadataUpdater = metadataUpdater;
         this.lookupClient = lookupClient;
         this.lookupColumnNames = lookupColumnNames;
+        this.insertIfNotExists = insertIfNotExists;
+    }
+
+    @Override
+    public Lookup enableInsertIfNotExists() {
+        if (lookupColumnNames != null) {

Review Comment:
   This check can be uniformly moved to createLookuper().
   
   



##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java:
##########
@@ -23,5 +23,6 @@
 @Internal
 public enum LookupType {
     LOOKUP,
+    LOOKUP_WITH_INSERT_IF_NOT_EXISTS,

Review Comment:
   I think there's no need to extend the enum definition here.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to