This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f731fc6cd [client] Add Pojo API support for read and write operations  
(#1992)
f731fc6cd is described below

commit f731fc6cd5a06a30dd179ea6317e80a738bbcfed
Author: Giannis Polyzos <[email protected]>
AuthorDate: Tue Dec 23 18:00:18 2025 +0200

    [client] Add Pojo API support for read and write operations  (#1992)
---
 .../fluss/client/converter/ConverterCommons.java   |  18 +
 .../fluss/client/converter/PojoToRowConverter.java |  23 +-
 .../apache/fluss/client/converter/PojoType.java    |  30 +-
 .../org/apache/fluss/client/lookup/Lookup.java     |  39 +-
 .../apache/fluss/client/lookup/LookupResult.java   |   4 +-
 .../org/apache/fluss/client/lookup/Lookuper.java   |  15 +-
 .../fluss/client/lookup/PrefixKeyLookuper.java     |   2 +-
 .../fluss/client/lookup/PrimaryKeyLookuper.java    |   2 +-
 .../apache/fluss/client/lookup/TableLookup.java    |   5 +
 .../TypedLookuper.java}                            |  18 +-
 .../fluss/client/lookup/TypedLookuperImpl.java     |  78 +++
 .../apache/fluss/client/table/scanner/Scan.java    |   8 +
 .../fluss/client/table/scanner/TableScan.java      |   8 +
 .../client/table/scanner/TypedScanRecord.java      |  88 ++++
 .../client/table/scanner/log/TypedLogScanner.java  |  90 ++++
 .../table/scanner/log/TypedLogScannerImpl.java     | 109 ++++
 .../client/table/scanner/log/TypedScanRecords.java | 120 +++++
 .../apache/fluss/client/table/writer/Append.java   |   5 +-
 .../fluss/client/table/writer/AppendWriter.java    |   6 +-
 .../fluss/client/table/writer/TableAppend.java     |   5 +
 .../fluss/client/table/writer/TableUpsert.java     |   5 +
 .../{AppendWriter.java => TypedAppendWriter.java}  |  14 +-
 .../client/table/writer/TypedAppendWriterImpl.java |  56 +++
 .../{UpsertWriter.java => TypedUpsertWriter.java}  |  21 +-
 .../client/table/writer/TypedUpsertWriterImpl.java | 131 +++++
 .../apache/fluss/client/table/writer/Upsert.java   |  22 +-
 .../fluss/client/table/writer/UpsertWriter.java    |  13 +-
 .../client/admin/ClientToServerITCaseBase.java     |   8 +
 .../fluss/client/table/FlussTypedClientITCase.java | 558 +++++++++++++++++++++
 .../fluss/client/write/RecordAccumulatorTest.java  |   2 +
 30 files changed, 1429 insertions(+), 74 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
index 5f4f1b6c1..aeb36419a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -83,6 +83,24 @@ final class ConverterCommons {
         }
     }
 
+    static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType 
projection) {
+        Set<String> pojoNames = pojoType.getProperties().keySet();
+        List<String> fieldNames = projection.getFieldNames();
+        if (!pojoNames.containsAll(fieldNames)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "POJO fields %s must contain all projection fields 
%s. "
+                                    + "For full-table writes, POJO fields must 
exactly match table schema fields.",
+                            pojoNames, fieldNames));
+        }
+        for (int i = 0; i < projection.getFieldCount(); i++) {
+            String name = fieldNames.get(i);
+            DataType dt = projection.getTypeAt(i);
+            PojoType.Property prop = pojoType.getProperty(name);
+            validateCompatibility(dt, prop);
+        }
+    }
+
     static void validateProjectionSubset(RowType projection, RowType 
tableSchema) {
         Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
         for (String n : projection.getFieldNames()) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
index 4d75aa91a..0f4f6d8a4 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -30,6 +30,7 @@ import org.apache.fluss.types.RowType;
 import javax.annotation.Nullable;
 
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -54,7 +55,9 @@ public final class PojoToRowConverter<T> {
         this.tableSchema = tableSchema;
         this.projection = projection;
         this.projectionFieldNames = projection.getFieldNames();
-        ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+        // For writer path, allow POJO to be a superset of the projection. It 
must contain all
+        // projected fields.
+        ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
         ConverterCommons.validateProjectionSubset(projection, tableSchema);
         this.fieldConverters = createFieldConverters();
     }
@@ -177,8 +180,22 @@ public final class PojoToRowConverter<T> {
                     String.format(
                             "Field %s is not a BigDecimal. Cannot convert to 
Decimal.", prop.name));
         }
-        return Decimal.fromBigDecimal(
-                (BigDecimal) v, decimalType.getPrecision(), 
decimalType.getScale());
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+
+        // Scale with a deterministic rounding mode to avoid 
ArithmeticException when rounding is
+        // needed.
+        BigDecimal bd = (BigDecimal) v;
+        BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
+
+        if (scaled.precision() > precision) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Decimal value for field %s exceeds precision %d 
after scaling to %d: %s",
+                            prop.name, precision, scale, scaled));
+        }
+
+        return Decimal.fromBigDecimal(scaled, precision, scale);
     }
 
     /** Converts a LocalDate POJO property to number of days since epoch. */
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java 
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
index 7790b9143..de848f448 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
@@ -37,6 +37,7 @@ final class PojoType<T> {
     private final Class<T> pojoClass;
     private final Constructor<T> defaultConstructor;
     private final Map<String, Property> properties; // property name -> 
property
+    private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED = 
createPrimitiveToBoxedMap();
 
     private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, 
Property> props) {
         this.pojoClass = pojoClass;
@@ -73,12 +74,15 @@ final class PojoType<T> {
         for (Map.Entry<String, Field> e : allFields.entrySet()) {
             String name = e.getKey();
             Field field = e.getValue();
+            // Enforce nullable fields: primitives are not allowed in POJO 
definitions.
             if (field.getType().isPrimitive()) {
                 throw new IllegalArgumentException(
                         String.format(
                                 "POJO class %s has primitive field '%s' of 
type %s. Primitive types are not allowed; all fields must be nullable (use 
wrapper types).",
                                 pojoClass.getName(), name, 
field.getType().getName()));
             }
+            // use boxed type as effective type
+            Class<?> effectiveType = boxIfPrimitive(field.getType());
             boolean publicField = Modifier.isPublic(field.getModifiers());
             Method getter = getters.get(name);
             Method setter = setters.get(name);
@@ -94,8 +98,7 @@ final class PojoType<T> {
             }
             props.put(
                     name,
-                    new Property(
-                            name, field.getType(), publicField ? field : null, 
getter, setter));
+                    new Property(name, effectiveType, publicField ? field : 
null, getter, setter));
         }
 
         return new PojoType<>(pojoClass, ctor, props);
@@ -235,6 +238,29 @@ final class PojoType<T> {
         return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
     }
 
+    private static Map<Class<?>, Class<?>> createPrimitiveToBoxedMap() {
+        Map<Class<?>, Class<?>> map = new HashMap<>();
+        map.put(boolean.class, Boolean.class);
+        map.put(byte.class, Byte.class);
+        map.put(short.class, Short.class);
+        map.put(int.class, Integer.class);
+        map.put(long.class, Long.class);
+        map.put(float.class, Float.class);
+        map.put(double.class, Double.class);
+        map.put(char.class, Character.class);
+        // void shouldn't appear as a field type, but handle defensively
+        map.put(void.class, Void.class);
+        return map;
+    }
+
+    private static Class<?> boxIfPrimitive(Class<?> type) {
+        if (!type.isPrimitive()) {
+            return type;
+        }
+        Class<?> boxed = PRIMITIVE_TO_BOXED.get(type);
+        return boxed != null ? boxed : type;
+    }
+
     static final class Property {
         final String name;
         final Class<?> type;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
index 31faed2f7..e64685b9a 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
@@ -24,31 +24,37 @@ import java.util.List;
 
 /**
  * Used to configure and create a {@link Lookuper} to lookup rows of a primary 
key table. The built
- * Lookuper can be a primary key lookuper that lookups by the primary key, or 
a prefix key lookup
- * that lookups by the prefix key of the primary key.
+ * lookuper can lookup by the full primary key, or by a prefix of the primary 
key when configured
+ * via {@link #lookupBy}.
  *
  * <p>{@link Lookup} objects are immutable and can be shared between threads. 
Refinement methods,
- * like {@link #lookupBy}, create new Lookup instances.
+ * like {@link #lookupBy}, create new {@code Lookup} instances.
  *
- * <p>Example1: Create a Primary Key Lookuper. Given a table with primary key 
column [k STRING].
+ * <p>Examples
+ *
+ * <p>Example 1: Primary Key Lookuper using an InternalRow key. Given a table 
with primary key
+ * column [k STRING]:
  *
  * <pre>{@code
  * Lookuper lookuper = table.newLookup().createLookuper();
  * CompletableFuture<LookupResult> resultFuture = 
lookuper.lookup(GenericRow.of("key1"));
- * resultFuture.get().getRows().forEach(row -> {
- *    System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
  * }</pre>
  *
- * <p>Example2: Create a Prefix Key Lookuper. Given a table with primary key 
column [a INT, b
- * STRING, c BIGINT] and bucket key [a, b].
+ * <p>Example 2: Prefix Key Lookuper using an InternalRow key. Given a table 
with primary key
+ * columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
  *
  * <pre>{@code
  * Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
  * CompletableFuture<LookupResult> resultFuture = 
lookuper.lookup(GenericRow.of(1, "b1"));
- * resultFuture.get().getRows().forEach(row -> {
- *   System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
+ * }</pre>
+ *
+ * <p>Example 3: Using a POJO key (conversion handled internally):
+ *
+ * <pre>{@code
+ * TypedLookuper<MyKeyPojo> lookuper = 
table.newLookup().createTypedLookuper(MyKeyPojo.class);
+ * LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
  * }</pre>
  *
  * @since 0.6
@@ -96,4 +102,13 @@ public interface Lookup {
      * @return the lookuper
      */
     Lookuper createLookuper();
+
+    /**
+     * Creates a {@link TypedLookuper} instance to lookup rows of a primary 
key table using POJOs.
+     *
+     * @param pojoClass the class of the POJO
+     * @param <T> the type of the POJO
+     * @return the typed lookuper
+     */
+    <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
index b4536640e..83497d37c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * The result of {@link Lookuper#lookup(InternalRow)}.
+ * The result of a lookup operation performed by a {@link Lookuper}. It 
carries zero, one, or many
+ * {@link org.apache.fluss.row.InternalRow} values depending on whether the 
underlying lookup is a
+ * primary-key lookup (at most one) or a prefix-key lookup (zero or more).
  *
  * @since 0.1
  */
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
index 37157d9b1..9998a2612 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
@@ -25,11 +25,16 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The lookup-er is used to lookup row of a primary key table by primary key 
or prefix key. The
- * lookuper has retriable ability to handle transient errors during lookup 
operations which is
- * configured by {@link 
org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
+ * A lookuper performs key-based lookups against a primary key table, using 
either the full primary
+ * key or a prefix of the primary key (when configured via {@code 
Lookup#lookupBy}).
  *
- * <p>Note: Lookuper instances are not thread-safe.
+ * <p>Usage examples:
+ *
+ * <pre>{@code
+ * // Row-based key (InternalRow)
+ * Lookuper lookuper = table.newLookup().createLookuper();
+ * LookupResult res = lookuper.lookup(keyRow).get();
+ * }</pre>
  *
  * @since 0.6
  */
@@ -44,7 +49,7 @@ public interface Lookuper {
      * {@code table.newLookup().createLookuper()}), or be the prefix key if 
the lookuper is a Prefix
      * Key Lookuper (created by {@code 
table.newLookup().lookupBy(prefixKeys).createLookuper()}).
      *
-     * @param lookupKey the lookup key.
+     * @param lookupKey the lookup key
      * @return the result of lookup.
      */
     CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index a2d1029d2..a7b3e6545 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -46,7 +46,7 @@ import static 
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
  * of the primary key.
  */
 @NotThreadSafe
-class PrefixKeyLookuper extends AbstractLookuper {
+class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {
 
     /** Extract bucket key from prefix lookup key row. */
     private final KeyEncoder bucketKeyEncoder;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 6c3264763..d2cd94963 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -40,7 +40,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** An implementation of {@link Lookuper} that lookups by primary key. */
 @NotThreadSafe
-class PrimaryKeyLookuper extends AbstractLookuper {
+class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {
 
     private final KeyEncoder primaryKeyEncoder;
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
index 84c9bdb21..e0c92661f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
@@ -71,4 +71,9 @@ public class TableLookup implements Lookup {
                     tableInfo, schemaGetter, metadataUpdater, lookupClient, 
lookupColumnNames);
         }
     }
+
+    @Override
+    public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
+        return new TypedLookuperImpl<>(createLookuper(), tableInfo, 
lookupColumnNames, pojoClass);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
 b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
similarity index 68%
copy from 
fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
copy to 
fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
index 4edcc2471..2aa7624c8 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.client.table.writer;
+package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The writer to write data to the log table.
+ * A typed lookuper performs key-based lookups against a primary key table 
using POJOs.
  *
- * @since 0.2
+ * @param <T> the type of the lookup key
+ * @since 0.6
  */
 @PublicEvolving
-public interface AppendWriter extends TableWriter {
+public interface TypedLookuper<T> {
 
     /**
-     * Append row into a Log Table.
+     * Lookups certain row from the given lookup key.
      *
-     * @param row the row to append.
-     * @return A {@link CompletableFuture} that always returns append result 
when complete normally.
+     * @param lookupKey the lookup key
+     * @return the result of lookup.
      */
-    CompletableFuture<AppendResult> append(InternalRow row);
+    CompletableFuture<LookupResult> lookup(T lookupKey);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
new file mode 100644
index 000000000..c88eb15d1
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
@@ -0,0 +1,78 @@
+package org.apache.fluss.client.lookup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Decorator for {@link Lookuper} that enables generic key lookup via {@link
+ * TypedLookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} 
using existing
+ * converters based on table schema and active lookup columns, and directly 
delegates when the key
+ * is already an {@link InternalRow}.
+ */
+final class TypedLookuperImpl<K> implements TypedLookuper<K> {
+
+    private final Lookuper delegate;
+    private final TableInfo tableInfo;
+    @Nullable private final List<String> lookupColumnNames;
+    private final PojoToRowConverter<K> keyConv;
+
+    TypedLookuperImpl(
+            Lookuper delegate,
+            TableInfo tableInfo,
+            @Nullable List<String> lookupColumnNames,
+            Class<K> keyClass) {
+        this.delegate = delegate;
+        this.tableInfo = tableInfo;
+        this.lookupColumnNames = lookupColumnNames;
+        this.keyConv = createPojoToRowConverter(keyClass);
+    }
+
+    @Override
+    public CompletableFuture<LookupResult> lookup(K key) {
+        if (key == null) {
+            throw new IllegalArgumentException("key must not be null");
+        }
+        // Fast-path: already an InternalRow
+        if (key instanceof InternalRow) {
+            return delegate.lookup((InternalRow) key);
+        }
+
+        InternalRow keyRow = keyConv.toRow(key);
+        return delegate.lookup(keyRow);
+    }
+
+    private PojoToRowConverter<K> createPojoToRowConverter(Class<K> keyClass) {
+        RowType tableSchema = tableInfo.getRowType();
+        RowType keyProjection;
+        if (lookupColumnNames == null) {
+            keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
+        } else {
+            keyProjection = tableSchema.project(lookupColumnNames);
+        }
+        return PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
index 97e37847f..1c05bd391 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
@@ -20,6 +20,7 @@ package org.apache.fluss.client.table.scanner;
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
 import org.apache.fluss.metadata.TableBucket;
 
 import javax.annotation.Nullable;
@@ -65,6 +66,13 @@ public interface Scan {
      */
     LogScanner createLogScanner();
 
+    /**
+     * Creates a {@link TypedLogScanner} to continuously read log data as 
POJOs of the given class.
+     *
+     * <p>Note: this API doesn't support pre-configured with {@link 
#limit(int)}.
+     */
+    <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass);
+
     /**
      * Creates a {@link BatchScanner} to read current data in the given table 
bucket for this scan.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 0ca1952cf..0d1d28a0e 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -25,6 +25,8 @@ import 
org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
 import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
 import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.metadata.SchemaGetter;
@@ -113,6 +115,12 @@ public class TableScan implements Scan {
                 schemaGetter);
     }
 
+    @Override
+    public <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass) {
+        LogScanner base = createLogScanner();
+        return new TypedLogScannerImpl<>(base, pojoClass, tableInfo, 
projectedColumns);
+    }
+
     @Override
     public BatchScanner createBatchScanner(TableBucket tableBucket) {
         if (limit == null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
new file mode 100644
index 000000000..5a2d3b8a3
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Objects;
+
+/**
+ * A record produced by a table scanner which contains a typed value.
+ *
+ * @param <T> The type of the value.
+ */
+@PublicEvolving
+public class TypedScanRecord<T> {
+
+    private final ScanRecord scanRecord;
+    private final T value;
+
+    public TypedScanRecord(ScanRecord scanRecord, T value) {
+        this.scanRecord = scanRecord;
+        this.value = value;
+    }
+
+    /** The position of this record in the corresponding fluss table bucket. */
+    public long logOffset() {
+        return scanRecord.logOffset();
+    }
+
+    /** The timestamp of this record. */
+    public long timestamp() {
+        return scanRecord.timestamp();
+    }
+
+    /** The change type of this record. */
+    public ChangeType getChangeType() {
+        return scanRecord.getChangeType();
+    }
+
+    /** Returns the record value. */
+    public T getValue() {
+        return value;
+    }
+
+    /** Returns the internal row of this record. */
+    public InternalRow getRow() {
+        return scanRecord.getRow();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TypedScanRecord<?> that = (TypedScanRecord<?>) o;
+        return Objects.equals(scanRecord, that.scanRecord) && 
Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(scanRecord, value);
+    }
+
+    @Override
+    public String toString() {
+        return scanRecord.getChangeType().shortString() + value + "@" + 
scanRecord.logOffset();
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
new file mode 100644
index 000000000..a700ab434
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.time.Duration;
+
+/**
+ * A typed scanner is used to scan log data as POJOs of specify table from 
Fluss.
+ *
+ * @param <T> the type of the POJO
+ * @since 0.6
+ */
+@PublicEvolving
+public interface TypedLogScanner<T> extends AutoCloseable {
+
+    /**
+     * Poll log data from tablet server.
+     *
+     * @param timeout the timeout to poll.
+     * @return the result of poll.
+     */
+    TypedScanRecords<T> poll(Duration timeout);
+
+    /**
+     * Subscribe to the given table bucket from beginning dynamically. If the 
table bucket is
+     * already subscribed, the start offset will be updated.
+     *
+     * @param bucket the table bucket to subscribe.
+     */
+    void subscribeFromBeginning(int bucket);
+
+    /**
+     * Subscribe to the given partitioned table bucket from beginning 
dynamically. If the table
+     * bucket is already subscribed, the start offset will be updated.
+     *
+     * @param partitionId the partition id of the table partition to subscribe.
+     * @param bucket the table bucket to subscribe.
+     */
+    void subscribeFromBeginning(long partitionId, int bucket);
+
+    /**
+     * Subscribe to the given table bucket in given offset dynamically. If the 
table bucket is
+     * already subscribed, the offset will be updated.
+     *
+     * @param bucket the table bucket to subscribe.
+     * @param offset the offset to start from.
+     */
+    void subscribe(int bucket, long offset);
+
+    /**
+     * Subscribe to the given partitioned table bucket in given offset 
dynamically. If the table
+     * bucket is already subscribed, the offset will be updated.
+     *
+     * @param partitionId the partition id of the table partition to subscribe.
+     * @param bucket the table bucket to subscribe.
+     * @param offset the offset to start from.
+     */
+    void subscribe(long partitionId, int bucket, long offset);
+
+    /**
+     * Unsubscribe from the given bucket of given partition dynamically.
+     *
+     * @param partitionId the partition id of the table partition to 
unsubscribe.
+     * @param bucket the table bucket to unsubscribe.
+     */
+    void unsubscribe(long partitionId, int bucket);
+
+    /**
+     * Wake up the log scanner in case the fetcher thread in log scanner is 
blocking in {@link
+     * #poll(Duration timeout)}.
+     */
+    void wakeup();
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
new file mode 100644
index 000000000..3b7ea32d4
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Adapter that converts {@link InternalRow} records from a {@link LogScanner} 
into POJOs of type T.
+ */
+public class TypedLogScannerImpl<T> implements TypedLogScanner<T> {
+
+    private final LogScanner delegate;
+    private final RowToPojoConverter<T> converter;
+
+    public TypedLogScannerImpl(
+            LogScanner delegate, Class<T> pojoClass, TableInfo tableInfo, 
int[] projectedColumns) {
+        this.delegate = delegate;
+        RowType tableSchema = tableInfo.getRowType();
+        RowType projection =
+                projectedColumns == null ? tableSchema : 
tableSchema.project(projectedColumns);
+        this.converter = RowToPojoConverter.of(pojoClass, tableSchema, 
projection);
+    }
+
+    @Override
+    public TypedScanRecords<T> poll(Duration timeout) {
+        ScanRecords records = delegate.poll(timeout);
+        if (records == null || records.isEmpty()) {
+            return TypedScanRecords.empty();
+        }
+        Map<TableBucket, List<TypedScanRecord<T>>> out = new HashMap<>();
+        for (TableBucket bucket : records.buckets()) {
+            List<ScanRecord> list = records.records(bucket);
+            List<TypedScanRecord<T>> converted = new ArrayList<>(list.size());
+            for (ScanRecord r : list) {
+                InternalRow row = r.getRow();
+                T pojo = converter.fromRow(row);
+                converted.add(new TypedScanRecord<>(r, pojo));
+            }
+            out.put(bucket, converted);
+        }
+        return new TypedScanRecords<>(out);
+    }
+
+    @Override
+    public void subscribeFromBeginning(int bucket) {
+        delegate.subscribeFromBeginning(bucket);
+    }
+
+    @Override
+    public void subscribeFromBeginning(long partitionId, int bucket) {
+        delegate.subscribeFromBeginning(partitionId, bucket);
+    }
+
+    @Override
+    public void subscribe(int bucket, long offset) {
+        delegate.subscribe(bucket, offset);
+    }
+
+    @Override
+    public void subscribe(long partitionId, int bucket, long offset) {
+        delegate.subscribe(partitionId, bucket, offset);
+    }
+
+    @Override
+    public void unsubscribe(long partitionId, int bucket) {
+        delegate.unsubscribe(partitionId, bucket);
+    }
+
+    @Override
+    public void wakeup() {
+        delegate.wakeup();
+    }
+
+    @Override
+    public void close() {
+        try {
+            delegate.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
new file mode 100644
index 000000000..f69347802
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.AbstractIterator;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A container that holds the list {@link TypedScanRecord} per bucket for a 
particular table. There
+ * is one {@link TypedScanRecord} list for every bucket returned by a {@link
+ * TypedLogScanner#poll(java.time.Duration)} operation.
+ *
+ * @param <T> the type of the POJO
+ * @since 0.6
+ */
+@PublicEvolving
+public class TypedScanRecords<T> implements Iterable<TypedScanRecord<T>> {
+
+    public static <T> TypedScanRecords<T> empty() {
+        return new TypedScanRecords<>(Collections.emptyMap());
+    }
+
+    private final Map<TableBucket, List<TypedScanRecord<T>>> records;
+
+    public TypedScanRecords(Map<TableBucket, List<TypedScanRecord<T>>> 
records) {
+        this.records = records;
+    }
+
+    /**
+     * Get just the records for the given bucketId.
+     *
+     * @param scanBucket The bucket to get records for
+     */
+    public List<TypedScanRecord<T>> records(TableBucket scanBucket) {
+        List<TypedScanRecord<T>> recs = records.get(scanBucket);
+        if (recs == null) {
+            return Collections.emptyList();
+        }
+        return Collections.unmodifiableList(recs);
+    }
+
+    /**
+     * Get the bucket ids which have records contained in this record set.
+     *
+     * @return the set of partitions with data in this record set (maybe empty 
if no data was
+     *     returned)
+     */
+    public Set<TableBucket> buckets() {
+        return Collections.unmodifiableSet(records.keySet());
+    }
+
+    /** The number of records for all buckets. */
+    public int count() {
+        int count = 0;
+        for (List<TypedScanRecord<T>> recs : records.values()) {
+            count += recs.size();
+        }
+        return count;
+    }
+
+    public boolean isEmpty() {
+        return records.isEmpty();
+    }
+
+    @Override
+    public Iterator<TypedScanRecord<T>> iterator() {
+        return new ConcatenatedIterable<>(records.values()).iterator();
+    }
+
+    private static class ConcatenatedIterable<T> implements 
Iterable<TypedScanRecord<T>> {
+
+        private final Iterable<? extends Iterable<TypedScanRecord<T>>> 
iterables;
+
+        public ConcatenatedIterable(Iterable<? extends 
Iterable<TypedScanRecord<T>>> iterables) {
+            this.iterables = iterables;
+        }
+
+        @Override
+        public Iterator<TypedScanRecord<T>> iterator() {
+            return new AbstractIterator<TypedScanRecord<T>>() {
+                final Iterator<? extends Iterable<TypedScanRecord<T>>> iters = 
iterables.iterator();
+                Iterator<TypedScanRecord<T>> current;
+
+                public TypedScanRecord<T> makeNext() {
+                    while (current == null || !current.hasNext()) {
+                        if (iters.hasNext()) {
+                            current = iters.next().iterator();
+                        } else {
+                            return allDone();
+                        }
+                    }
+                    return current.next();
+                }
+            };
+        }
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
index 6a5b10523..06a0b694e 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
@@ -32,6 +32,9 @@ public interface Append {
     // TODO: Add more methods to configure the AppendWriter, such as apply 
static partitions,
     //  apply overwrites, etc.
 
-    /** Create a new {@link AppendWriter} to write data to a Log Table. */
+    /** Create a new {@link AppendWriter} to write data to a Log Table using 
InternalRow. */
     AppendWriter createWriter();
+
+    /** Create a new typed {@link AppendWriter} to write POJOs directly. */
+    <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
index 4edcc2471..1b5cee055 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
@@ -31,10 +31,10 @@ import java.util.concurrent.CompletableFuture;
 public interface AppendWriter extends TableWriter {
 
     /**
-     * Append row into a Log Table.
+     * Append a record into a Log Table.
      *
-     * @param row the row to append.
+     * @param record the record to append.
      * @return A {@link CompletableFuture} that always returns append result 
when complete normally.
      */
-    CompletableFuture<AppendResult> append(InternalRow row);
+    CompletableFuture<AppendResult> append(InternalRow record);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
index 50e416d56..84ccaf319 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
@@ -38,4 +38,9 @@ public class TableAppend implements Append {
     public AppendWriter createWriter() {
         return new AppendWriterImpl(tablePath, tableInfo, writerClient);
     }
+
+    @Override
+    public <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass) {
+        return new TypedAppendWriterImpl<>(createWriter(), pojoClass, 
tableInfo);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
index 7d90a9ccd..fe865eac4 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
@@ -95,4 +95,9 @@ public class TableUpsert implements Upsert {
     public UpsertWriter createWriter() {
         return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, 
writerClient);
     }
+
+    @Override
+    public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
+        return new TypedUpsertWriterImpl<>(createWriter(), pojoClass, 
tableInfo, targetColumns);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
similarity index 77%
copy from 
fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
copy to 
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
index 4edcc2471..bff903913 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
@@ -18,23 +18,23 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The writer to write data to the log table.
+ * The typed writer to write data to the log table using POJOs.
  *
- * @since 0.2
+ * @param <T> the type of the record
+ * @since 0.6
  */
 @PublicEvolving
-public interface AppendWriter extends TableWriter {
+public interface TypedAppendWriter<T> extends TableWriter {
 
     /**
-     * Append row into a Log Table.
+     * Append a record into a Log Table.
      *
-     * @param row the row to append.
+     * @param record the record to append.
      * @return A {@link CompletableFuture} that always returns append result 
when complete normally.
      */
-    CompletableFuture<AppendResult> append(InternalRow row);
+    CompletableFuture<AppendResult> append(T record);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
new file mode 100644
index 000000000..3b964594b
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link AppendWriter} that converts POJOs to {@link InternalRow} and 
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedAppendWriterImpl<T> implements TypedAppendWriter<T> {
+
+    private final AppendWriter delegate;
+    private final RowType tableSchema;
+    private final PojoToRowConverter<T> pojoToRowConverter;
+
+    TypedAppendWriterImpl(AppendWriter delegate, Class<T> pojoClass, TableInfo 
tableInfo) {
+        this.delegate = delegate;
+        this.tableSchema = tableInfo.getRowType();
+        this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, 
tableSchema, tableSchema);
+    }
+
+    @Override
+    public void flush() {
+        delegate.flush();
+    }
+
+    @Override
+    public CompletableFuture<AppendResult> append(T record) {
+        if (record instanceof InternalRow) {
+            return delegate.append((InternalRow) record);
+        }
+        InternalRow row = pojoToRowConverter.toRow(record);
+        return delegate.append(row);
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
similarity index 68%
copy from 
fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
copy to 
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
index 6648eebfc..4d241f5e6 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
@@ -18,32 +18,31 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The writer to write data to the primary key table.
+ * The typed writer to write data to the primary key table using POJOs.
  *
- * @since 0.2
+ * @param <T> the type of the record
+ * @since 0.6
  */
 @PublicEvolving
-public interface UpsertWriter extends TableWriter {
+public interface TypedUpsertWriter<T> extends TableWriter {
 
     /**
-     * Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
+     * Inserts a record into Fluss table if it does not already exist, or 
updates it if it does.
      *
-     * @param row the row to upsert.
+     * @param record the record to upsert.
      * @return A {@link CompletableFuture} that always returns upsert result 
when complete normally.
      */
-    CompletableFuture<UpsertResult> upsert(InternalRow row);
+    CompletableFuture<UpsertResult> upsert(T record);
 
     /**
-     * Delete certain row by the input row in Fluss table, the input row must 
contain the primary
-     * key.
+     * Delete a certain record from the Fluss table. The input must contain 
the primary key fields.
      *
-     * @param row the row to delete.
+     * @param record the record to delete.
      * @return A {@link CompletableFuture} that always delete result when 
complete normally.
      */
-    CompletableFuture<DeleteResult> delete(InternalRow row);
+    CompletableFuture<DeleteResult> delete(T record);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
new file mode 100644
index 000000000..45af9f0ef
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and 
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
+
+    private final UpsertWriter delegate;
+    private final TableInfo tableInfo;
+    private final RowType tableSchema;
+    @Nullable private final int[] targetColumns;
+
+    private final RowType pkProjection;
+    @Nullable private final RowType targetProjection;
+
+    private final PojoToRowConverter<T> pojoToRowConverter;
+    private final PojoToRowConverter<T> pkConverter;
+    @Nullable private final PojoToRowConverter<T> targetConverter;
+
+    TypedUpsertWriterImpl(
+            UpsertWriter delegate, Class<T> pojoClass, TableInfo tableInfo, 
int[] targetColumns) {
+        this.delegate = delegate;
+        this.tableInfo = tableInfo;
+        this.tableSchema = tableInfo.getRowType();
+        this.targetColumns = targetColumns;
+
+        // Precompute projections
+        this.pkProjection = 
this.tableSchema.project(tableInfo.getPhysicalPrimaryKeys());
+        this.targetProjection =
+                (targetColumns == null) ? null : 
this.tableSchema.project(targetColumns);
+
+        // Initialize reusable converters
+        this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, 
tableSchema, tableSchema);
+        this.pkConverter = PojoToRowConverter.of(pojoClass, tableSchema, 
pkProjection);
+        this.targetConverter =
+                (targetProjection == null)
+                        ? null
+                        : PojoToRowConverter.of(pojoClass, tableSchema, 
targetProjection);
+    }
+
+    @Override
+    public void flush() {
+        delegate.flush();
+    }
+
+    @Override
+    public CompletableFuture<UpsertResult> upsert(T record) {
+        if (record instanceof InternalRow) {
+            return delegate.upsert((InternalRow) record);
+        }
+        InternalRow row = convertPojo(record, /*forDelete=*/ false);
+        return delegate.upsert(row);
+    }
+
+    @Override
+    public CompletableFuture<DeleteResult> delete(T record) {
+        if (record instanceof InternalRow) {
+            return delegate.delete((InternalRow) record);
+        }
+        InternalRow pkOnly = convertPojo(record, /*forDelete=*/ true);
+        return delegate.delete(pkOnly);
+    }
+
+    private InternalRow convertPojo(T pojo, boolean forDelete) {
+        final RowType projection;
+        final PojoToRowConverter<T> converter;
+        if (forDelete) {
+            projection = pkProjection;
+            converter = pkConverter;
+        } else if (targetProjection != null && targetConverter != null) {
+            projection = targetProjection;
+            converter = targetConverter;
+        } else {
+            projection = tableSchema;
+            converter = pojoToRowConverter;
+        }
+
+        GenericRow projected = converter.toRow(pojo);
+        if (projection == tableSchema) {
+            return projected;
+        }
+        // expand projected row to full row if needed
+        GenericRow full = new GenericRow(tableSchema.getFieldCount());
+        if (forDelete) {
+            // set PK fields, others null
+            for (String pk : tableInfo.getPhysicalPrimaryKeys()) {
+                int projIndex = projection.getFieldIndex(pk);
+
+                // TODO: this can be optimized by pre-computing
+                // the index mapping in the constructor?
+                int fullIndex = tableSchema.getFieldIndex(pk);
+                full.setField(fullIndex, projected.getField(projIndex));
+            }
+        } else if (targetColumns != null) {
+            for (int i = 0; i < projection.getFieldCount(); i++) {
+                String name = projection.getFieldNames().get(i);
+                int fullIdx = tableSchema.getFieldIndex(name);
+                full.setField(fullIdx, projected.getField(i));
+            }
+        }
+        return full;
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
index d3c3608ee..0843437fe 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
@@ -18,7 +18,6 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
 
 import javax.annotation.Nullable;
 
@@ -37,16 +36,14 @@ public interface Upsert {
     /**
      * Apply partial update columns and returns a new Upsert instance.
      *
-     * <p>For {@link UpsertWriter#upsert(InternalRow)} operation, only the 
specified columns will be
-     * updated and other columns will remain unchanged if the row exists or 
set to null if the row
-     * doesn't exist.
+     * <p>For upsert operations, only the specified columns will be updated 
and other columns will
+     * remain unchanged if the row exists or set to null if the row doesn't 
exist.
      *
-     * <p>For {@link UpsertWriter#delete(InternalRow)} operation, the entire 
row will not be
-     * removed, but only the specified columns except primary key will be set 
to null. The entire
-     * row will be removed when all columns except primary key are null after 
a {@link
-     * UpsertWriter#delete(InternalRow)} operation.
+     * <p>For delete operations, the entire row will not be removed 
immediately, but only the
+     * specified columns except primary key will be set to null. The entire 
row will be removed when
+     * all columns except primary key are null after a delete operation.
      *
-     * <p>Note: The specified columns must be a contains all columns of 
primary key, and all columns
+     * <p>Note: The specified columns must contain all columns of primary key, 
and all columns
      * except primary key should be nullable.
      *
      * @param targetColumns the column indexes to partial update
@@ -60,8 +57,11 @@ public interface Upsert {
     Upsert partialUpdate(String... targetColumnNames);
 
     /**
-     * Create a new {@link UpsertWriter} with the optional {@link 
#partialUpdate(String...)}
-     * information to upsert and delete data to a Primary Key Table.
+     * Create a new {@link UpsertWriter} using {@code InternalRow} with the 
optional {@link
+     * #partialUpdate(String...)} information to upsert and delete data to a 
Primary Key Table.
      */
     UpsertWriter createWriter();
+
+    /** Create a new typed {@link UpsertWriter} to write POJOs directly. */
+    <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass);
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
index 6648eebfc..e4d751747 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
@@ -31,19 +31,18 @@ import java.util.concurrent.CompletableFuture;
 public interface UpsertWriter extends TableWriter {
 
     /**
-     * Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
+     * Inserts a record into Fluss table if it does not already exist, or 
updates it if it does.
      *
-     * @param row the row to upsert.
+     * @param record the record to upsert.
      * @return A {@link CompletableFuture} that always returns upsert result 
when complete normally.
      */
-    CompletableFuture<UpsertResult> upsert(InternalRow row);
+    CompletableFuture<UpsertResult> upsert(InternalRow record);
 
     /**
-     * Delete certain row by the input row in Fluss table, the input row must 
contain the primary
-     * key.
+     * Delete a certain record from the Fluss table. The input must contain 
the primary key fields.
      *
-     * @param row the row to delete.
+     * @param record the record to delete.
      * @return A {@link CompletableFuture} that always delete result when 
complete normally.
      */
-    CompletableFuture<DeleteResult> delete(InternalRow row);
+    CompletableFuture<DeleteResult> delete(InternalRow record);
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index a62c1b70d..8270ed788 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -26,6 +26,7 @@ import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
 import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
@@ -144,6 +145,13 @@ public abstract class ClientToServerITCaseBase {
         }
     }
 
+    protected static void subscribeFromBeginning(TypedLogScanner<?> 
logScanner, Table table) {
+        int bucketCount = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < bucketCount; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+    }
+
     protected static void subscribeFromTimestamp(
             TablePath tablePath,
             @Nullable String partitionName,
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
new file mode 100644
index 000000000..c7724799b
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+    /** Test POJO containing all supported field types used by converters. */
+    public static class AllTypesPojo {
+        // primary key
+        public Integer a;
+        // all supported converter fields
+        public Boolean bool1;
+        public Byte tiny;
+        public Short small;
+        public Integer intv;
+        public Long big;
+        public Float flt;
+        public Double dbl;
+        public Character ch;
+        public String str;
+        public byte[] bin;
+        public byte[] bytes;
+        public BigDecimal dec;
+        public LocalDate dt;
+        public LocalTime tm;
+        public LocalDateTime tsNtz;
+        public Instant tsLtz;
+
+        public AllTypesPojo() {}
+
+        public AllTypesPojo(
+                Integer a,
+                Boolean bool1,
+                Byte tiny,
+                Short small,
+                Integer intv,
+                Long big,
+                Float flt,
+                Double dbl,
+                Character ch,
+                String str,
+                byte[] bin,
+                byte[] bytes,
+                BigDecimal dec,
+                LocalDate dt,
+                LocalTime tm,
+                LocalDateTime tsNtz,
+                Instant tsLtz) {
+            this.a = a;
+            this.bool1 = bool1;
+            this.tiny = tiny;
+            this.small = small;
+            this.intv = intv;
+            this.big = big;
+            this.flt = flt;
+            this.dbl = dbl;
+            this.ch = ch;
+            this.str = str;
+            this.bin = bin;
+            this.bytes = bytes;
+            this.dec = dec;
+            this.dt = dt;
+            this.tm = tm;
+            this.tsNtz = tsNtz;
+            this.tsLtz = tsLtz;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AllTypesPojo that = (AllTypesPojo) o;
+            return Objects.equals(a, that.a)
+                    && Objects.equals(bool1, that.bool1)
+                    && Objects.equals(tiny, that.tiny)
+                    && Objects.equals(small, that.small)
+                    && Objects.equals(intv, that.intv)
+                    && Objects.equals(big, that.big)
+                    && Objects.equals(flt, that.flt)
+                    && Objects.equals(dbl, that.dbl)
+                    && Objects.equals(ch, that.ch)
+                    && Objects.equals(str, that.str)
+                    && Arrays.equals(bin, that.bin)
+                    && Arrays.equals(bytes, that.bytes)
+                    && Objects.equals(dec, that.dec)
+                    && Objects.equals(dt, that.dt)
+                    && Objects.equals(tm, that.tm)
+                    && Objects.equals(tsNtz, that.tsNtz)
+                    && Objects.equals(tsLtz, that.tsLtz);
+        }
+
+        @Override
+        public int hashCode() {
+            int result =
+                    Objects.hash(
+                            a, bool1, tiny, small, intv, big, flt, dbl, ch, 
str, dec, dt, tm, tsNtz,
+                            tsLtz);
+            result = 31 * result + Arrays.hashCode(bin);
+            result = 31 * result + Arrays.hashCode(bytes);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "AllTypesPojo{"
+                    + "a="
+                    + a
+                    + ", bool1="
+                    + bool1
+                    + ", tiny="
+                    + tiny
+                    + ", small="
+                    + small
+                    + ", intv="
+                    + intv
+                    + ", big="
+                    + big
+                    + ", flt="
+                    + flt
+                    + ", dbl="
+                    + dbl
+                    + ", ch="
+                    + ch
+                    + ", str='"
+                    + str
+                    + '\''
+                    + ", bin="
+                    + Arrays.toString(bin)
+                    + ", bytes="
+                    + Arrays.toString(bytes)
+                    + ", dec="
+                    + dec
+                    + ", dt="
+                    + dt
+                    + ", tm="
+                    + tm
+                    + ", tsNtz="
+                    + tsNtz
+                    + ", tsLtz="
+                    + tsLtz
+                    + '}';
+        }
+    }
+
+    /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+    public static class PLookupKey {
+        public Integer a;
+
+        public PLookupKey() {}
+
+        public PLookupKey(Integer a) {
+            this.a = a;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PLookupKey that = (PLookupKey) o;
+            return Objects.equals(a, that.a);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(a);
+        }
+
+        @Override
+        public String toString() {
+            return "PLookupKey{" + "a=" + a + '}';
+        }
+    }
+
+    private static Schema allTypesLogSchema() {
+        return Schema.newBuilder()
+                .column("a", DataTypes.INT())
+                .column("bool1", DataTypes.BOOLEAN())
+                .column("tiny", DataTypes.TINYINT())
+                .column("small", DataTypes.SMALLINT())
+                .column("intv", DataTypes.INT())
+                .column("big", DataTypes.BIGINT())
+                .column("flt", DataTypes.FLOAT())
+                .column("dbl", DataTypes.DOUBLE())
+                .column("ch", DataTypes.CHAR(1))
+                .column("str", DataTypes.STRING())
+                .column("bin", DataTypes.BINARY(3))
+                .column("bytes", DataTypes.BYTES())
+                .column("dec", DataTypes.DECIMAL(10, 2))
+                .column("dt", DataTypes.DATE())
+                .column("tm", DataTypes.TIME())
+                .column("tsNtz", DataTypes.TIMESTAMP(3))
+                .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+                .build();
+    }
+
+    private static Schema allTypesPkSchema() {
+        // Same columns as log schema but with PK on 'a'
+        return Schema.newBuilder()
+                .column("a", DataTypes.INT())
+                .column("bool1", DataTypes.BOOLEAN())
+                .column("tiny", DataTypes.TINYINT())
+                .column("small", DataTypes.SMALLINT())
+                .column("intv", DataTypes.INT())
+                .column("big", DataTypes.BIGINT())
+                .column("flt", DataTypes.FLOAT())
+                .column("dbl", DataTypes.DOUBLE())
+                .column("ch", DataTypes.CHAR(1))
+                .column("str", DataTypes.STRING())
+                .column("bin", DataTypes.BINARY(3))
+                .column("bytes", DataTypes.BYTES())
+                .column("dec", DataTypes.DECIMAL(10, 2))
+                .column("dt", DataTypes.DATE())
+                .column("tm", DataTypes.TIME())
+                .column("tsNtz", DataTypes.TIMESTAMP(3))
+                .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+                .primaryKey("a")
+                .build();
+    }
+
+    private static AllTypesPojo newAllTypesPojo(int i) {
+        Integer a = i;
+        Boolean bool1 = (i % 2) == 0;
+        Byte tiny = (byte) (i - 5);
+        Short small = (short) (100 + i);
+        Integer intv = 1000 + i;
+        Long big = 100000L + i;
+        Float flt = 1.5f + i;
+        Double dbl = 2.5 + i;
+        Character ch = (char) ('a' + (i % 26));
+        String str = "s" + i;
+        byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+        byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+        BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2, 
RoundingMode.HALF_UP);
+        LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+        LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+        LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0, 
0).plusSeconds(i).withNano(0);
+        Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+        return new AllTypesPojo(
+                a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin, 
bytes, dec, dt, tm, tsNtz,
+                tsLtz);
+    }
+
+    @Test
+    void testTypedAppendWriteAndScan() throws Exception {
+        // Build all-types log table schema
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("bool1", DataTypes.BOOLEAN())
+                        .column("tiny", DataTypes.TINYINT())
+                        .column("small", DataTypes.SMALLINT())
+                        .column("intv", DataTypes.INT())
+                        .column("big", DataTypes.BIGINT())
+                        .column("flt", DataTypes.FLOAT())
+                        .column("dbl", DataTypes.DOUBLE())
+                        .column("ch", DataTypes.CHAR(1))
+                        .column("str", DataTypes.STRING())
+                        .column("bin", DataTypes.BINARY(3))
+                        .column("bytes", DataTypes.BYTES())
+                        .column("dec", DataTypes.DECIMAL(10, 2))
+                        .column("dt", DataTypes.DATE())
+                        .column("tm", DataTypes.TIME())
+                        .column("tsNtz", DataTypes.TIMESTAMP(3))
+                        .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+                        .build();
+        TablePath path = TablePath.of("pojo_db", "all_types_log");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            // write
+            TypedAppendWriter<AllTypesPojo> writer =
+                    table.newAppend().createTypedWriter(AllTypesPojo.class);
+            List<AllTypesPojo> expected = new ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                AllTypesPojo u = newAllTypesPojo(i);
+                expected.add(u);
+                writer.append(u);
+            }
+            writer.flush();
+
+            // read
+            Scan scan = table.newScan();
+            TypedLogScanner<AllTypesPojo> scanner = 
scan.createTypedLogScanner(AllTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+
+            List<AllTypesPojo> actual = new ArrayList<>();
+            while (actual.size() < expected.size()) {
+                TypedScanRecords<AllTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+                for (TypedScanRecord<AllTypesPojo> r : recs) {
+                    
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+                    actual.add(r.getValue());
+                }
+            }
+            assertThat(actual)
+                    .usingRecursiveFieldByFieldElementComparator()
+                    .containsExactlyInAnyOrderElementsOf(expected);
+        }
+    }
+
+    @Test
+    void testTypedUpsertWriteAndScan() throws Exception {
+        // Build all-types PK table schema (PK on 'a')
+        Schema schema = allTypesPkSchema();
+        TablePath path = TablePath.of("pojo_db", "all_types_pk");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            Upsert upsert = table.newUpsert();
+            TypedUpsertWriter<AllTypesPojo> writer = 
upsert.createTypedWriter(AllTypesPojo.class);
+
+            AllTypesPojo p1 = newAllTypesPojo(1);
+            AllTypesPojo p2 = newAllTypesPojo(2);
+            writer.upsert(p1).get();
+            writer.upsert(p2).get();
+
+            // update key 1: change a couple of fields
+            AllTypesPojo p1Updated = newAllTypesPojo(1);
+            p1Updated.str = "a1";
+            p1Updated.dec = new java.math.BigDecimal("42.42");
+            writer.upsert(p1Updated).get();
+            writer.flush();
+
+            // scan as POJOs and verify change types and values
+            TypedLogScanner<AllTypesPojo> scanner =
+                    table.newScan().createTypedLogScanner(AllTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+
+            List<ChangeType> changes = new ArrayList<>();
+            List<AllTypesPojo> values = new ArrayList<>();
+            while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE 
1, UPDATE_AFTER 1
+                TypedScanRecords<AllTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+                for (TypedScanRecord<AllTypesPojo> r : recs) {
+                    changes.add(r.getChangeType());
+                    values.add(r.getValue());
+                }
+            }
+
+            assertThat(changes)
+                    .containsExactlyInAnyOrder(
+                            ChangeType.INSERT,
+                            ChangeType.INSERT,
+                            ChangeType.UPDATE_BEFORE,
+                            ChangeType.UPDATE_AFTER);
+            // ensure the last update_after reflects new value
+            int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER);
+            assertThat(values.get(lastIdx)).isEqualTo(p1Updated);
+        }
+    }
+
+    @Test
+    void testTypedLookups() throws Exception {
+        Schema schema = allTypesPkSchema();
+        TablePath path = TablePath.of("pojo_db", "lookup_pk");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            TypedUpsertWriter<AllTypesPojo> writer =
+                    table.newUpsert().createTypedWriter(AllTypesPojo.class);
+            writer.upsert(newAllTypesPojo(1)).get();
+            writer.upsert(newAllTypesPojo(2)).get();
+            writer.flush();
+
+            // primary key lookup using Lookuper API with POJO key
+            TypedLookuper<PLookupKey> lookuper =
+                    table.newLookup().createTypedLookuper(PLookupKey.class);
+            RowType tableSchema = table.getTableInfo().getRowType();
+            RowToPojoConverter<AllTypesPojo> rowConv =
+                    RowToPojoConverter.of(AllTypesPojo.class, tableSchema, 
tableSchema);
+
+            LookupResult lr = lookuper.lookup(new PLookupKey(1)).get();
+            AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow());
+            assertThat(one).isEqualTo(newAllTypesPojo(1));
+        }
+    }
+
+    @Test
+    void testInternalRowLookup() throws Exception {
+        Schema schema = allTypesPkSchema();
+        TablePath path = TablePath.of("pojo_db", "lookup_internalrow");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            // write a couple of rows via POJO writer
+            TypedUpsertWriter<AllTypesPojo> writer =
+                    table.newUpsert().createTypedWriter(AllTypesPojo.class);
+            writer.upsert(newAllTypesPojo(101)).get();
+            writer.upsert(newAllTypesPojo(202)).get();
+            writer.flush();
+
+            // now perform lookup using the raw InternalRow path to ensure 
it's still supported
+            Lookuper lookuper = table.newLookup().createLookuper();
+            RowType tableSchema = table.getTableInfo().getRowType();
+            RowType keyProjection = 
tableSchema.project(table.getTableInfo().getPrimaryKeys());
+
+            // Build the key row directly using GenericRow to avoid any POJO 
conversion
+            GenericRow keyRow = new GenericRow(keyProjection.getFieldCount());
+            keyRow.setField(0, 101); // primary key field 'a'
+
+            LookupResult lr = lookuper.lookup(keyRow).get();
+            RowToPojoConverter<AllTypesPojo> rowConv =
+                    RowToPojoConverter.of(AllTypesPojo.class, tableSchema, 
tableSchema);
+            AllTypesPojo pojo = rowConv.fromRow(lr.getSingletonRow());
+            assertThat(pojo).isEqualTo(newAllTypesPojo(101));
+        }
+    }
+
+    @Test
+    void testTypedProjections() throws Exception {
+        TablePath path = TablePath.of("pojo_db", "proj_log");
+        TableDescriptor td =
+                
TableDescriptor.builder().schema(allTypesLogSchema()).distributedBy(1).build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            TypedAppendWriter<AllTypesPojo> writer =
+                    table.newAppend().createTypedWriter(AllTypesPojo.class);
+            writer.append(newAllTypesPojo(10)).get();
+            writer.append(newAllTypesPojo(11)).get();
+            writer.flush();
+
+            // Project only a subset of fields
+            TypedLogScanner<AllTypesPojo> scanner =
+                    table.newScan()
+                            .project(Arrays.asList("a", "str"))
+                            .createTypedLogScanner(AllTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+            TypedScanRecords<AllTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+            int i = 10;
+            for (TypedScanRecord<AllTypesPojo> r : recs) {
+                AllTypesPojo u = r.getValue();
+                AllTypesPojo expectedPojo = new AllTypesPojo();
+                expectedPojo.a = i;
+                expectedPojo.str = "s" + i;
+                assertThat(u).isEqualTo(expectedPojo);
+                i++;
+            }
+        }
+    }
+
+    @Test
+    void testTypedPartialUpdates() throws Exception {
+        // Use full PK schema and update a subset of fields
+        Schema schema = allTypesPkSchema();
+        TablePath path = TablePath.of("pojo_db", "pk_partial");
+        TableDescriptor td = 
TableDescriptor.builder().schema(schema).distributedBy(1, "a").build();
+        createTable(path, td, true);
+
+        try (Table table = conn.getTable(path)) {
+            // 1. initial full row
+            TypedUpsertWriter<AllTypesPojo> fullWriter =
+                    table.newUpsert().createTypedWriter(AllTypesPojo.class);
+            fullWriter.upsert(newAllTypesPojo(1)).get();
+            fullWriter.flush();
+
+            // 2. partial update: only PK + subset fields
+            Upsert upsert = table.newUpsert().partialUpdate("a", "str", "dec");
+            TypedUpsertWriter<AllTypesPojo> writer = 
upsert.createTypedWriter(AllTypesPojo.class);
+
+            AllTypesPojo patch = new AllTypesPojo();
+            patch.a = 1;
+            patch.str = "second";
+            patch.dec = new BigDecimal("99.99");
+            writer.upsert(patch).get();
+            writer.flush();
+
+            // verify via lookup and scan using Lookuper + POJO key
+            TypedLookuper<PLookupKey> lookuper =
+                    table.newLookup().createTypedLookuper(PLookupKey.class);
+            RowType tableSchema = table.getTableInfo().getRowType();
+            RowToPojoConverter<AllTypesPojo> rowConv =
+                    RowToPojoConverter.of(AllTypesPojo.class, tableSchema, 
tableSchema);
+            AllTypesPojo lookedUp =
+                    rowConv.fromRow(lookuper.lookup(new 
PLookupKey(1)).get().getSingletonRow());
+            AllTypesPojo expected = newAllTypesPojo(1);
+            expected.str = "second";
+            expected.dec = new BigDecimal("99.99");
+            assertThat(lookedUp).isEqualTo(expected);
+
+            TypedLogScanner<AllTypesPojo> scanner =
+                    table.newScan().createTypedLogScanner(AllTypesPojo.class);
+            subscribeFromBeginning(scanner, table);
+            boolean sawUpdateAfter = false;
+            while (!sawUpdateAfter) {
+                TypedScanRecords<AllTypesPojo> recs = 
scanner.poll(Duration.ofSeconds(2));
+                for (TypedScanRecord<AllTypesPojo> r : recs) {
+                    if (r.getChangeType() == ChangeType.UPDATE_AFTER) {
+                        assertThat(r.getValue()).isEqualTo(expected);
+                        sawUpdateAfter = true;
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 24f264c92..efebb3278 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -562,6 +562,8 @@ class RecordAccumulatorTest {
 
         Map<TablePath, Long> tableIdByPath = new HashMap<>();
         tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
+        Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
+        tableInfoByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
         return new Cluster(
                 aliveTabletServersById,
                 new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),

Reply via email to