wuchong commented on code in PR #1992:
URL: https://github.com/apache/fluss/pull/1992#discussion_r2637879234


##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java:
##########
@@ -0,0 +1,73 @@
+package org.apache.fluss.client.lookup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Decorator for {@link Lookuper} that enables generic key lookup via {@link
+ * Lookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} using 
existing converters
+ * based on table schema and active lookup columns, and directly delegates 
when the key is already
+ * an {@link InternalRow}.
+ */
+final class TypedLookuper<K> implements Lookuper<K> {
+
+    private final Lookuper<InternalRow> delegate;
+    private final TableInfo tableInfo;
+    @Nullable private final List<String> lookupColumnNames;
+
+    TypedLookuper(
+            Lookuper<InternalRow> delegate,
+            TableInfo tableInfo,
+            @Nullable List<String> lookupColumnNames) {
+        this.delegate = delegate;
+        this.tableInfo = tableInfo;
+        this.lookupColumnNames = lookupColumnNames;
+    }
+
+    @Override
+    public CompletableFuture<LookupResult> lookup(K key) {
+        if (key == null) {
+            throw new IllegalArgumentException("key must not be null");
+        }
+        // Fast-path: already an InternalRow
+        if (key instanceof InternalRow) {
+            return delegate.lookup((InternalRow) key);
+        }
+        RowType tableSchema = tableInfo.getRowType();
+        RowType keyProjection;
+        if (lookupColumnNames == null) {
+            keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
+        } else {
+            keyProjection = tableSchema.project(lookupColumnNames);
+        }
+        @SuppressWarnings("unchecked")
+        Class<K> keyClass = (Class<K>) key.getClass();
+        PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, 
tableSchema, keyProjection);

Review Comment:
   These are very heavy operations and should be invoked on hot-path. The 
`PojoToRowConverter` should be initialized in the constructor. This can improve 
the performance a lot. 



##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java:
##########
@@ -93,7 +99,11 @@ default Lookup lookupBy(String... lookupColumnNames) {
      * lookup columns. By default, the lookup columns are the primary key 
columns, but can be
      * changed with ({@link #lookupBy(List)}) method.
      *
+     * <p>The returned lookuper accepts generic keys of type {@code K}. If a 
key is a POJO, the
+     * client implementation will convert it to an {@code InternalRow} based 
on the table schema and
+     * the active lookup columns.
+     *
      * @return the lookuper
      */
-    Lookuper createLookuper();
+    <K> Lookuper<K> createLookuper();

Review Comment:
   This is a big change for Fluss users, as `Lookuper`, `Appender`, and 
`Upserter` are fundamental data access interfaces. To maintain backward 
compatibility, we should **keep these existing interfaces unchanged**.
   
   Instead, we can introduce a new generic interface: `TypedLookuper<T>`. This 
interface adds a type-safe lookup method:
   
   ```java
   CompletableFuture<LookupResult> lookup(T lookupKey);
   ```
   
   We can then provide a new factory method in `Lookup` to create it:
   
   ```java
   <K> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
   ```
   
   This approach preserves existing APIs while enabling type-safe lookups for 
users who need them.



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java:
##########
@@ -63,7 +64,14 @@ public interface Scan {
      *
      * <p>Note: this API doesn't support pre-configured with {@link 
#limit(int)}.
      */
-    LogScanner createLogScanner();
+    LogScanner<InternalRow> createLogScanner();
+
+    /**
+     * Creates a typed LogScanner to continuously read log data as POJOs of 
the given class.
+     *
+     * <p>Note: this API doesn't support pre-configured with {@link 
#limit(int)}.
+     */
+    <T> LogScanner<T> createLogScanner(Class<T> pojoClass);

Review Comment:
   Same to `TypedLookuper`, we should keep `LogScanner` unchanged and introduce 
   
   ```java
   public interface TypedLogScanner<T> {
   
       @Override
       TypedScanRecords<T> poll(Duration timeout);
   }
   
   
   public class TypedScanRecords<T> extends ScanRecords {
       ...
           @Override
       public Iterator<TypedScanRecord> iterator() {
           ...
       }
   }
   
   public class TypedScanRecord<T> extends ScanRecord {
       private final T pojo;
   }
   ```
   
   
   



##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java:
##########
@@ -60,8 +57,11 @@ public interface Upsert {
     Upsert partialUpdate(String... targetColumnNames);
 
     /**
-     * Create a new {@link UpsertWriter} with the optional {@link 
#partialUpdate(String...)}
-     * information to upsert and delete data to a Primary Key Table.
+     * Create a new {@link UpsertWriter} using {@code InternalRow} with the 
optional {@link
+     * #partialUpdate(String...)} information to upsert and delete data to a 
Primary Key Table.
      */
-    UpsertWriter createWriter();
+    UpsertWriter<?> createWriter();
+
+    /** Create a new typed {@link UpsertWriter} to write POJOs directly. */
+    <T> UpsertWriter<T> createWriter(Class<T> pojoClass);

Review Comment:
   ditto



##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java:
##########
@@ -32,6 +32,9 @@ public interface Append {
     // TODO: Add more methods to configure the AppendWriter, such as apply 
static partitions,
     //  apply overwrites, etc.
 
-    /** Create a new {@link AppendWriter} to write data to a Log Table. */
-    AppendWriter createWriter();
+    /** Create a new {@link AppendWriter} to write data to a Log Table using 
InternalRow. */
+    AppendWriter<?> createWriter();
+
+    /** Create a new typed {@link AppendWriter} to write POJOs directly. */
+    <T> AppendWriter<T> createWriter(Class<T> pojoClass);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to