This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f731fc6cd [client] Add Pojo API support for read and write operations
(#1992)
f731fc6cd is described below
commit f731fc6cd5a06a30dd179ea6317e80a738bbcfed
Author: Giannis Polyzos <[email protected]>
AuthorDate: Tue Dec 23 18:00:18 2025 +0200
[client] Add Pojo API support for read and write operations (#1992)
---
.../fluss/client/converter/ConverterCommons.java | 18 +
.../fluss/client/converter/PojoToRowConverter.java | 23 +-
.../apache/fluss/client/converter/PojoType.java | 30 +-
.../org/apache/fluss/client/lookup/Lookup.java | 39 +-
.../apache/fluss/client/lookup/LookupResult.java | 4 +-
.../org/apache/fluss/client/lookup/Lookuper.java | 15 +-
.../fluss/client/lookup/PrefixKeyLookuper.java | 2 +-
.../fluss/client/lookup/PrimaryKeyLookuper.java | 2 +-
.../apache/fluss/client/lookup/TableLookup.java | 5 +
.../TypedLookuper.java} | 18 +-
.../fluss/client/lookup/TypedLookuperImpl.java | 78 +++
.../apache/fluss/client/table/scanner/Scan.java | 8 +
.../fluss/client/table/scanner/TableScan.java | 8 +
.../client/table/scanner/TypedScanRecord.java | 88 ++++
.../client/table/scanner/log/TypedLogScanner.java | 90 ++++
.../table/scanner/log/TypedLogScannerImpl.java | 109 ++++
.../client/table/scanner/log/TypedScanRecords.java | 120 +++++
.../apache/fluss/client/table/writer/Append.java | 5 +-
.../fluss/client/table/writer/AppendWriter.java | 6 +-
.../fluss/client/table/writer/TableAppend.java | 5 +
.../fluss/client/table/writer/TableUpsert.java | 5 +
.../{AppendWriter.java => TypedAppendWriter.java} | 14 +-
.../client/table/writer/TypedAppendWriterImpl.java | 56 +++
.../{UpsertWriter.java => TypedUpsertWriter.java} | 21 +-
.../client/table/writer/TypedUpsertWriterImpl.java | 131 +++++
.../apache/fluss/client/table/writer/Upsert.java | 22 +-
.../fluss/client/table/writer/UpsertWriter.java | 13 +-
.../client/admin/ClientToServerITCaseBase.java | 8 +
.../fluss/client/table/FlussTypedClientITCase.java | 558 +++++++++++++++++++++
.../fluss/client/write/RecordAccumulatorTest.java | 2 +
30 files changed, 1429 insertions(+), 74 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
index 5f4f1b6c1..aeb36419a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java
@@ -83,6 +83,24 @@ final class ConverterCommons {
}
}
+ static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType
projection) {
+ Set<String> pojoNames = pojoType.getProperties().keySet();
+ List<String> fieldNames = projection.getFieldNames();
+ if (!pojoNames.containsAll(fieldNames)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "POJO fields %s must contain all projection fields
%s. "
+ + "For full-table writes, POJO fields must
exactly match table schema fields.",
+ pojoNames, fieldNames));
+ }
+ for (int i = 0; i < projection.getFieldCount(); i++) {
+ String name = fieldNames.get(i);
+ DataType dt = projection.getTypeAt(i);
+ PojoType.Property prop = pojoType.getProperty(name);
+ validateCompatibility(dt, prop);
+ }
+ }
+
static void validateProjectionSubset(RowType projection, RowType
tableSchema) {
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
for (String n : projection.getFieldNames()) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
index 4d75aa91a..0f4f6d8a4 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java
@@ -30,6 +30,7 @@ import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -54,7 +55,9 @@ public final class PojoToRowConverter<T> {
this.tableSchema = tableSchema;
this.projection = projection;
this.projectionFieldNames = projection.getFieldNames();
- ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
+ // For writer path, allow POJO to be a superset of the projection. It
must contain all
+ // projected fields.
+ ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
ConverterCommons.validateProjectionSubset(projection, tableSchema);
this.fieldConverters = createFieldConverters();
}
@@ -177,8 +180,22 @@ public final class PojoToRowConverter<T> {
String.format(
"Field %s is not a BigDecimal. Cannot convert to
Decimal.", prop.name));
}
- return Decimal.fromBigDecimal(
- (BigDecimal) v, decimalType.getPrecision(),
decimalType.getScale());
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+
+ // Scale with a deterministic rounding mode to avoid
ArithmeticException when rounding is
+ // needed.
+ BigDecimal bd = (BigDecimal) v;
+ BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
+
+ if (scaled.precision() > precision) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Decimal value for field %s exceeds precision %d
after scaling to %d: %s",
+ prop.name, precision, scale, scaled));
+ }
+
+ return Decimal.fromBigDecimal(scaled, precision, scale);
}
/** Converts a LocalDate POJO property to number of days since epoch. */
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
index 7790b9143..de848f448 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
@@ -37,6 +37,7 @@ final class PojoType<T> {
private final Class<T> pojoClass;
private final Constructor<T> defaultConstructor;
private final Map<String, Property> properties; // property name ->
property
+ private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED =
createPrimitiveToBoxedMap();
private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String,
Property> props) {
this.pojoClass = pojoClass;
@@ -73,12 +74,15 @@ final class PojoType<T> {
for (Map.Entry<String, Field> e : allFields.entrySet()) {
String name = e.getKey();
Field field = e.getValue();
+ // Enforce nullable fields: primitives are not allowed in POJO
definitions.
if (field.getType().isPrimitive()) {
throw new IllegalArgumentException(
String.format(
"POJO class %s has primitive field '%s' of
type %s. Primitive types are not allowed; all fields must be nullable (use
wrapper types).",
pojoClass.getName(), name,
field.getType().getName()));
}
+ // use boxed type as effective type
+ Class<?> effectiveType = boxIfPrimitive(field.getType());
boolean publicField = Modifier.isPublic(field.getModifiers());
Method getter = getters.get(name);
Method setter = setters.get(name);
@@ -94,8 +98,7 @@ final class PojoType<T> {
}
props.put(
name,
- new Property(
- name, field.getType(), publicField ? field : null,
getter, setter));
+ new Property(name, effectiveType, publicField ? field :
null, getter, setter));
}
return new PojoType<>(pojoClass, ctor, props);
@@ -235,6 +238,29 @@ final class PojoType<T> {
return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
}
+ private static Map<Class<?>, Class<?>> createPrimitiveToBoxedMap() {
+ Map<Class<?>, Class<?>> map = new HashMap<>();
+ map.put(boolean.class, Boolean.class);
+ map.put(byte.class, Byte.class);
+ map.put(short.class, Short.class);
+ map.put(int.class, Integer.class);
+ map.put(long.class, Long.class);
+ map.put(float.class, Float.class);
+ map.put(double.class, Double.class);
+ map.put(char.class, Character.class);
+ // void shouldn't appear as a field type, but handle defensively
+ map.put(void.class, Void.class);
+ return map;
+ }
+
+ private static Class<?> boxIfPrimitive(Class<?> type) {
+ if (!type.isPrimitive()) {
+ return type;
+ }
+ Class<?> boxed = PRIMITIVE_TO_BOXED.get(type);
+ return boxed != null ? boxed : type;
+ }
+
static final class Property {
final String name;
final Class<?> type;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
index 31faed2f7..e64685b9a 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
@@ -24,31 +24,37 @@ import java.util.List;
/**
* Used to configure and create a {@link Lookuper} to lookup rows of a primary
key table. The built
- * Lookuper can be a primary key lookuper that lookups by the primary key, or
a prefix key lookup
- * that lookups by the prefix key of the primary key.
+ * lookuper can lookup by the full primary key, or by a prefix of the primary
key when configured
+ * via {@link #lookupBy}.
*
* <p>{@link Lookup} objects are immutable and can be shared between threads.
Refinement methods,
- * like {@link #lookupBy}, create new Lookup instances.
+ * like {@link #lookupBy}, create new {@code Lookup} instances.
*
- * <p>Example1: Create a Primary Key Lookuper. Given a table with primary key
column [k STRING].
+ * <p>Examples
+ *
+ * <p>Example 1: Primary Key Lookuper using an InternalRow key. Given a table
with primary key
+ * column [k STRING]:
*
* <pre>{@code
* Lookuper lookuper = table.newLookup().createLookuper();
* CompletableFuture<LookupResult> resultFuture =
lookuper.lookup(GenericRow.of("key1"));
- * resultFuture.get().getRows().forEach(row -> {
- * System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
* }</pre>
*
- * <p>Example2: Create a Prefix Key Lookuper. Given a table with primary key
column [a INT, b
- * STRING, c BIGINT] and bucket key [a, b].
+ * <p>Example 2: Prefix Key Lookuper using an InternalRow key. Given a table
with primary key
+ * columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
*
* <pre>{@code
* Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
* CompletableFuture<LookupResult> resultFuture =
lookuper.lookup(GenericRow.of(1, "b1"));
- * resultFuture.get().getRows().forEach(row -> {
- * System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
+ * }</pre>
+ *
+ * <p>Example 3: Using a POJO key (conversion handled internally):
+ *
+ * <pre>{@code
+ * TypedLookuper<MyKeyPojo> lookuper =
table.newLookup().createTypedLookuper(MyKeyPojo.class);
+ * LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
* }</pre>
*
* @since 0.6
@@ -96,4 +102,13 @@ public interface Lookup {
* @return the lookuper
*/
Lookuper createLookuper();
+
+ /**
+ * Creates a {@link TypedLookuper} instance to lookup rows of a primary
key table using POJOs.
+ *
+ * @param pojoClass the class of the POJO
+ * @param <T> the type of the POJO
+ * @return the typed lookuper
+ */
+ <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
index b4536640e..83497d37c 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java
@@ -27,7 +27,9 @@ import java.util.List;
import java.util.Objects;
/**
- * The result of {@link Lookuper#lookup(InternalRow)}.
+ * The result of a lookup operation performed by a {@link Lookuper}. It
carries zero, one, or many
+ * {@link org.apache.fluss.row.InternalRow} values depending on whether the
underlying lookup is a
+ * primary-key lookup (at most one) or a prefix-key lookup (zero or more).
*
* @since 0.1
*/
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
index 37157d9b1..9998a2612 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
@@ -25,11 +25,16 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.concurrent.CompletableFuture;
/**
- * The lookup-er is used to lookup row of a primary key table by primary key
or prefix key. The
- * lookuper has retriable ability to handle transient errors during lookup
operations which is
- * configured by {@link
org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
+ * A lookuper performs key-based lookups against a primary key table, using
either the full primary
+ * key or a prefix of the primary key (when configured via {@code
Lookup#lookupBy}).
*
- * <p>Note: Lookuper instances are not thread-safe.
+ * <p>Usage examples:
+ *
+ * <pre>{@code
+ * // Row-based key (InternalRow)
+ * Lookuper lookuper = table.newLookup().createLookuper();
+ * LookupResult res = lookuper.lookup(keyRow).get();
+ * }</pre>
*
* @since 0.6
*/
@@ -44,7 +49,7 @@ public interface Lookuper {
* {@code table.newLookup().createLookuper()}), or be the prefix key if
the lookuper is a Prefix
* Key Lookuper (created by {@code
table.newLookup().lookupBy(prefixKeys).createLookuper()}).
*
- * @param lookupKey the lookup key.
+ * @param lookupKey the lookup key
* @return the result of lookup.
*/
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index a2d1029d2..a7b3e6545 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -46,7 +46,7 @@ import static
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
* of the primary key.
*/
@NotThreadSafe
-class PrefixKeyLookuper extends AbstractLookuper {
+class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {
/** Extract bucket key from prefix lookup key row. */
private final KeyEncoder bucketKeyEncoder;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 6c3264763..d2cd94963 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -40,7 +40,7 @@ import static
org.apache.fluss.utils.Preconditions.checkArgument;
/** An implementation of {@link Lookuper} that lookups by primary key. */
@NotThreadSafe
-class PrimaryKeyLookuper extends AbstractLookuper {
+class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {
private final KeyEncoder primaryKeyEncoder;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
index 84c9bdb21..e0c92661f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java
@@ -71,4 +71,9 @@ public class TableLookup implements Lookup {
tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames);
}
}
+
+ @Override
+ public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
+ return new TypedLookuperImpl<>(createLookuper(), tableInfo,
lookupColumnNames, pojoClass);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
similarity index 68%
copy from
fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
copy to
fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
index 4edcc2471..2aa7624c8 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.fluss.client.table.writer;
+package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
import java.util.concurrent.CompletableFuture;
/**
- * The writer to write data to the log table.
+ * A typed lookuper performs key-based lookups against a primary key table
using POJOs.
*
- * @since 0.2
+ * @param <T> the type of the lookup key
+ * @since 0.6
*/
@PublicEvolving
-public interface AppendWriter extends TableWriter {
+public interface TypedLookuper<T> {
/**
- * Append row into a Log Table.
+ * Lookups certain row from the given lookup key.
*
- * @param row the row to append.
- * @return A {@link CompletableFuture} that always returns append result
when complete normally.
+ * @param lookupKey the lookup key
+ * @return the result of lookup.
*/
- CompletableFuture<AppendResult> append(InternalRow row);
+ CompletableFuture<LookupResult> lookup(T lookupKey);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
new file mode 100644
index 000000000..c88eb15d1
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java
@@ -0,0 +1,78 @@
+package org.apache.fluss.client.lookup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Decorator for {@link Lookuper} that enables generic key lookup via {@link
+ * TypedLookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow}
using existing
+ * converters based on table schema and active lookup columns, and directly
delegates when the key
+ * is already an {@link InternalRow}.
+ */
+final class TypedLookuperImpl<K> implements TypedLookuper<K> {
+
+ private final Lookuper delegate;
+ private final TableInfo tableInfo;
+ @Nullable private final List<String> lookupColumnNames;
+ private final PojoToRowConverter<K> keyConv;
+
+ TypedLookuperImpl(
+ Lookuper delegate,
+ TableInfo tableInfo,
+ @Nullable List<String> lookupColumnNames,
+ Class<K> keyClass) {
+ this.delegate = delegate;
+ this.tableInfo = tableInfo;
+ this.lookupColumnNames = lookupColumnNames;
+ this.keyConv = createPojoToRowConverter(keyClass);
+ }
+
+ @Override
+ public CompletableFuture<LookupResult> lookup(K key) {
+ if (key == null) {
+ throw new IllegalArgumentException("key must not be null");
+ }
+ // Fast-path: already an InternalRow
+ if (key instanceof InternalRow) {
+ return delegate.lookup((InternalRow) key);
+ }
+
+ InternalRow keyRow = keyConv.toRow(key);
+ return delegate.lookup(keyRow);
+ }
+
+ private PojoToRowConverter<K> createPojoToRowConverter(Class<K> keyClass) {
+ RowType tableSchema = tableInfo.getRowType();
+ RowType keyProjection;
+ if (lookupColumnNames == null) {
+ keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
+ } else {
+ keyProjection = tableSchema.project(lookupColumnNames);
+ }
+ return PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
index 97e37847f..1c05bd391 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
@@ -20,6 +20,7 @@ package org.apache.fluss.client.table.scanner;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.metadata.TableBucket;
import javax.annotation.Nullable;
@@ -65,6 +66,13 @@ public interface Scan {
*/
LogScanner createLogScanner();
+ /**
+ * Creates a {@link TypedLogScanner} to continuously read log data as
POJOs of the given class.
+ *
+ * <p>Note: this API doesn't support pre-configured with {@link
#limit(int)}.
+ */
+ <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass);
+
/**
* Creates a {@link BatchScanner} to read current data in the given table
bucket for this scan.
*
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 0ca1952cf..0d1d28a0e 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -25,6 +25,8 @@ import
org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.metadata.SchemaGetter;
@@ -113,6 +115,12 @@ public class TableScan implements Scan {
schemaGetter);
}
+ @Override
+ public <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass) {
+ LogScanner base = createLogScanner();
+ return new TypedLogScannerImpl<>(base, pojoClass, tableInfo,
projectedColumns);
+ }
+
@Override
public BatchScanner createBatchScanner(TableBucket tableBucket) {
if (limit == null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
new file mode 100644
index 000000000..5a2d3b8a3
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Objects;
+
+/**
+ * A record produced by a table scanner which contains a typed value.
+ *
+ * @param <T> The type of the value.
+ */
+@PublicEvolving
+public class TypedScanRecord<T> {
+
+ private final ScanRecord scanRecord;
+ private final T value;
+
+ public TypedScanRecord(ScanRecord scanRecord, T value) {
+ this.scanRecord = scanRecord;
+ this.value = value;
+ }
+
+ /** The position of this record in the corresponding fluss table bucket. */
+ public long logOffset() {
+ return scanRecord.logOffset();
+ }
+
+ /** The timestamp of this record. */
+ public long timestamp() {
+ return scanRecord.timestamp();
+ }
+
+ /** The change type of this record. */
+ public ChangeType getChangeType() {
+ return scanRecord.getChangeType();
+ }
+
+ /** Returns the record value. */
+ public T getValue() {
+ return value;
+ }
+
+ /** Returns the internal row of this record. */
+ public InternalRow getRow() {
+ return scanRecord.getRow();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TypedScanRecord<?> that = (TypedScanRecord<?>) o;
+ return Objects.equals(scanRecord, that.scanRecord) &&
Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(scanRecord, value);
+ }
+
+ @Override
+ public String toString() {
+ return scanRecord.getChangeType().shortString() + value + "@" +
scanRecord.logOffset();
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
new file mode 100644
index 000000000..a700ab434
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.time.Duration;
+
+/**
+ * A typed scanner is used to scan log data as POJOs of specify table from
Fluss.
+ *
+ * @param <T> the type of the POJO
+ * @since 0.6
+ */
+@PublicEvolving
+public interface TypedLogScanner<T> extends AutoCloseable {
+
+ /**
+ * Poll log data from tablet server.
+ *
+ * @param timeout the timeout to poll.
+ * @return the result of poll.
+ */
+ TypedScanRecords<T> poll(Duration timeout);
+
+ /**
+ * Subscribe to the given table bucket from beginning dynamically. If the
table bucket is
+ * already subscribed, the start offset will be updated.
+ *
+ * @param bucket the table bucket to subscribe.
+ */
+ void subscribeFromBeginning(int bucket);
+
+ /**
+ * Subscribe to the given partitioned table bucket from beginning
dynamically. If the table
+ * bucket is already subscribed, the start offset will be updated.
+ *
+ * @param partitionId the partition id of the table partition to subscribe.
+ * @param bucket the table bucket to subscribe.
+ */
+ void subscribeFromBeginning(long partitionId, int bucket);
+
+ /**
+ * Subscribe to the given table bucket in given offset dynamically. If the
table bucket is
+ * already subscribed, the offset will be updated.
+ *
+ * @param bucket the table bucket to subscribe.
+ * @param offset the offset to start from.
+ */
+ void subscribe(int bucket, long offset);
+
+ /**
+ * Subscribe to the given partitioned table bucket in given offset
dynamically. If the table
+ * bucket is already subscribed, the offset will be updated.
+ *
+ * @param partitionId the partition id of the table partition to subscribe.
+ * @param bucket the table bucket to subscribe.
+ * @param offset the offset to start from.
+ */
+ void subscribe(long partitionId, int bucket, long offset);
+
+ /**
+ * Unsubscribe from the given bucket of given partition dynamically.
+ *
+ * @param partitionId the partition id of the table partition to
unsubscribe.
+ * @param bucket the table bucket to unsubscribe.
+ */
+ void unsubscribe(long partitionId, int bucket);
+
+ /**
+ * Wake up the log scanner in case the fetcher thread in log scanner is
blocking in {@link
+ * #poll(Duration timeout)}.
+ */
+ void wakeup();
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
new file mode 100644
index 000000000..3b7ea32d4
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Adapter that converts {@link InternalRow} records from a {@link LogScanner}
into POJOs of type T.
+ */
+public class TypedLogScannerImpl<T> implements TypedLogScanner<T> {
+
+ private final LogScanner delegate;
+ private final RowToPojoConverter<T> converter;
+
+ public TypedLogScannerImpl(
+ LogScanner delegate, Class<T> pojoClass, TableInfo tableInfo,
int[] projectedColumns) {
+ this.delegate = delegate;
+ RowType tableSchema = tableInfo.getRowType();
+ RowType projection =
+ projectedColumns == null ? tableSchema :
tableSchema.project(projectedColumns);
+ this.converter = RowToPojoConverter.of(pojoClass, tableSchema,
projection);
+ }
+
+ @Override
+ public TypedScanRecords<T> poll(Duration timeout) {
+ ScanRecords records = delegate.poll(timeout);
+ if (records == null || records.isEmpty()) {
+ return TypedScanRecords.empty();
+ }
+ Map<TableBucket, List<TypedScanRecord<T>>> out = new HashMap<>();
+ for (TableBucket bucket : records.buckets()) {
+ List<ScanRecord> list = records.records(bucket);
+ List<TypedScanRecord<T>> converted = new ArrayList<>(list.size());
+ for (ScanRecord r : list) {
+ InternalRow row = r.getRow();
+ T pojo = converter.fromRow(row);
+ converted.add(new TypedScanRecord<>(r, pojo));
+ }
+ out.put(bucket, converted);
+ }
+ return new TypedScanRecords<>(out);
+ }
+
+ @Override
+ public void subscribeFromBeginning(int bucket) {
+ delegate.subscribeFromBeginning(bucket);
+ }
+
+ @Override
+ public void subscribeFromBeginning(long partitionId, int bucket) {
+ delegate.subscribeFromBeginning(partitionId, bucket);
+ }
+
+ @Override
+ public void subscribe(int bucket, long offset) {
+ delegate.subscribe(bucket, offset);
+ }
+
+ @Override
+ public void subscribe(long partitionId, int bucket, long offset) {
+ delegate.subscribe(partitionId, bucket, offset);
+ }
+
+ @Override
+ public void unsubscribe(long partitionId, int bucket) {
+ delegate.unsubscribe(partitionId, bucket);
+ }
+
+ @Override
+ public void wakeup() {
+ delegate.wakeup();
+ }
+
+ @Override
+ public void close() {
+ try {
+ delegate.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
new file mode 100644
index 000000000..f69347802
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.AbstractIterator;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A container that holds the list {@link TypedScanRecord} per bucket for a
particular table. There
+ * is one {@link TypedScanRecord} list for every bucket returned by a {@link
+ * TypedLogScanner#poll(java.time.Duration)} operation.
+ *
+ * @param <T> the type of the POJO
+ * @since 0.6
+ */
+@PublicEvolving
+public class TypedScanRecords<T> implements Iterable<TypedScanRecord<T>> {
+
+ public static <T> TypedScanRecords<T> empty() {
+ return new TypedScanRecords<>(Collections.emptyMap());
+ }
+
+ private final Map<TableBucket, List<TypedScanRecord<T>>> records;
+
+ public TypedScanRecords(Map<TableBucket, List<TypedScanRecord<T>>>
records) {
+ this.records = records;
+ }
+
+ /**
+ * Get just the records for the given bucketId.
+ *
+ * @param scanBucket The bucket to get records for
+ */
+ public List<TypedScanRecord<T>> records(TableBucket scanBucket) {
+ List<TypedScanRecord<T>> recs = records.get(scanBucket);
+ if (recs == null) {
+ return Collections.emptyList();
+ }
+ return Collections.unmodifiableList(recs);
+ }
+
+ /**
+ * Get the bucket ids which have records contained in this record set.
+ *
+ * @return the set of partitions with data in this record set (maybe empty
if no data was
+ * returned)
+ */
+ public Set<TableBucket> buckets() {
+ return Collections.unmodifiableSet(records.keySet());
+ }
+
+ /** The number of records for all buckets. */
+ public int count() {
+ int count = 0;
+ for (List<TypedScanRecord<T>> recs : records.values()) {
+ count += recs.size();
+ }
+ return count;
+ }
+
+ public boolean isEmpty() {
+ return records.isEmpty();
+ }
+
+ @Override
+ public Iterator<TypedScanRecord<T>> iterator() {
+ return new ConcatenatedIterable<>(records.values()).iterator();
+ }
+
+ private static class ConcatenatedIterable<T> implements
Iterable<TypedScanRecord<T>> {
+
+ private final Iterable<? extends Iterable<TypedScanRecord<T>>>
iterables;
+
+ public ConcatenatedIterable(Iterable<? extends
Iterable<TypedScanRecord<T>>> iterables) {
+ this.iterables = iterables;
+ }
+
+ @Override
+ public Iterator<TypedScanRecord<T>> iterator() {
+ return new AbstractIterator<TypedScanRecord<T>>() {
+ final Iterator<? extends Iterable<TypedScanRecord<T>>> iters =
iterables.iterator();
+ Iterator<TypedScanRecord<T>> current;
+
+ public TypedScanRecord<T> makeNext() {
+ while (current == null || !current.hasNext()) {
+ if (iters.hasNext()) {
+ current = iters.next().iterator();
+ } else {
+ return allDone();
+ }
+ }
+ return current.next();
+ }
+ };
+ }
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
index 6a5b10523..06a0b694e 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
@@ -32,6 +32,9 @@ public interface Append {
// TODO: Add more methods to configure the AppendWriter, such as apply
static partitions,
// apply overwrites, etc.
- /** Create a new {@link AppendWriter} to write data to a Log Table. */
+ /** Create a new {@link AppendWriter} to write data to a Log Table using
InternalRow. */
AppendWriter createWriter();
+
+ /** Create a new typed {@link AppendWriter} to write POJOs directly. */
+ <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
index 4edcc2471..1b5cee055 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
@@ -31,10 +31,10 @@ import java.util.concurrent.CompletableFuture;
public interface AppendWriter extends TableWriter {
/**
- * Append row into a Log Table.
+ * Append a record into a Log Table.
*
- * @param row the row to append.
+ * @param record the record to append.
* @return A {@link CompletableFuture} that always returns append result
when complete normally.
*/
- CompletableFuture<AppendResult> append(InternalRow row);
+ CompletableFuture<AppendResult> append(InternalRow record);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
index 50e416d56..84ccaf319 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
@@ -38,4 +38,9 @@ public class TableAppend implements Append {
public AppendWriter createWriter() {
return new AppendWriterImpl(tablePath, tableInfo, writerClient);
}
+
+ @Override
+ public <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass) {
+ return new TypedAppendWriterImpl<>(createWriter(), pojoClass,
tableInfo);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
index 7d90a9ccd..fe865eac4 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
@@ -95,4 +95,9 @@ public class TableUpsert implements Upsert {
public UpsertWriter createWriter() {
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns,
writerClient);
}
+
+ @Override
+ public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
+ return new TypedUpsertWriterImpl<>(createWriter(), pojoClass,
tableInfo, targetColumns);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
similarity index 77%
copy from
fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
copy to
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
index 4edcc2471..bff903913 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
@@ -18,23 +18,23 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
import java.util.concurrent.CompletableFuture;
/**
- * The writer to write data to the log table.
+ * The typed writer to write data to the log table using POJOs.
*
- * @since 0.2
+ * @param <T> the type of the record
+ * @since 0.6
*/
@PublicEvolving
-public interface AppendWriter extends TableWriter {
+public interface TypedAppendWriter<T> extends TableWriter {
/**
- * Append row into a Log Table.
+ * Append a record into a Log Table.
*
- * @param row the row to append.
+ * @param record the record to append.
* @return A {@link CompletableFuture} that always returns append result
when complete normally.
*/
- CompletableFuture<AppendResult> append(InternalRow row);
+ CompletableFuture<AppendResult> append(T record);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
new file mode 100644
index 000000000..3b964594b
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link AppendWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedAppendWriterImpl<T> implements TypedAppendWriter<T> {
+
+ private final AppendWriter delegate;
+ private final RowType tableSchema;
+ private final PojoToRowConverter<T> pojoToRowConverter;
+
+ TypedAppendWriterImpl(AppendWriter delegate, Class<T> pojoClass, TableInfo
tableInfo) {
+ this.delegate = delegate;
+ this.tableSchema = tableInfo.getRowType();
+ this.pojoToRowConverter = PojoToRowConverter.of(pojoClass,
tableSchema, tableSchema);
+ }
+
+ @Override
+ public void flush() {
+ delegate.flush();
+ }
+
+ @Override
+ public CompletableFuture<AppendResult> append(T record) {
+ if (record instanceof InternalRow) {
+ return delegate.append((InternalRow) record);
+ }
+ InternalRow row = pojoToRowConverter.toRow(record);
+ return delegate.append(row);
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
similarity index 68%
copy from
fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
copy to
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
index 6648eebfc..4d241f5e6 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
@@ -18,32 +18,31 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
import java.util.concurrent.CompletableFuture;
/**
- * The writer to write data to the primary key table.
+ * The typed writer to write data to the primary key table using POJOs.
*
- * @since 0.2
+ * @param <T> the type of the record
+ * @since 0.6
*/
@PublicEvolving
-public interface UpsertWriter extends TableWriter {
+public interface TypedUpsertWriter<T> extends TableWriter {
/**
- * Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
+ * Inserts a record into Fluss table if it does not already exist, or
updates it if it does.
*
- * @param row the row to upsert.
+ * @param record the record to upsert.
* @return A {@link CompletableFuture} that always returns upsert result
when complete normally.
*/
- CompletableFuture<UpsertResult> upsert(InternalRow row);
+ CompletableFuture<UpsertResult> upsert(T record);
/**
- * Delete certain row by the input row in Fluss table, the input row must
contain the primary
- * key.
+ * Delete a certain record from the Fluss table. The input must contain
the primary key fields.
*
- * @param row the row to delete.
+ * @param record the record to delete.
* @return A {@link CompletableFuture} that always delete result when
complete normally.
*/
- CompletableFuture<DeleteResult> delete(InternalRow row);
+ CompletableFuture<DeleteResult> delete(T record);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
new file mode 100644
index 000000000..45af9f0ef
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
+
+ private final UpsertWriter delegate;
+ private final TableInfo tableInfo;
+ private final RowType tableSchema;
+ @Nullable private final int[] targetColumns;
+
+ private final RowType pkProjection;
+ @Nullable private final RowType targetProjection;
+
+ private final PojoToRowConverter<T> pojoToRowConverter;
+ private final PojoToRowConverter<T> pkConverter;
+ @Nullable private final PojoToRowConverter<T> targetConverter;
+
+ TypedUpsertWriterImpl(
+ UpsertWriter delegate, Class<T> pojoClass, TableInfo tableInfo,
int[] targetColumns) {
+ this.delegate = delegate;
+ this.tableInfo = tableInfo;
+ this.tableSchema = tableInfo.getRowType();
+ this.targetColumns = targetColumns;
+
+ // Precompute projections
+ this.pkProjection =
this.tableSchema.project(tableInfo.getPhysicalPrimaryKeys());
+ this.targetProjection =
+ (targetColumns == null) ? null :
this.tableSchema.project(targetColumns);
+
+ // Initialize reusable converters
+ this.pojoToRowConverter = PojoToRowConverter.of(pojoClass,
tableSchema, tableSchema);
+ this.pkConverter = PojoToRowConverter.of(pojoClass, tableSchema,
pkProjection);
+ this.targetConverter =
+ (targetProjection == null)
+ ? null
+ : PojoToRowConverter.of(pojoClass, tableSchema,
targetProjection);
+ }
+
+ @Override
+ public void flush() {
+ delegate.flush();
+ }
+
+ @Override
+ public CompletableFuture<UpsertResult> upsert(T record) {
+ if (record instanceof InternalRow) {
+ return delegate.upsert((InternalRow) record);
+ }
+ InternalRow row = convertPojo(record, /*forDelete=*/ false);
+ return delegate.upsert(row);
+ }
+
+ @Override
+ public CompletableFuture<DeleteResult> delete(T record) {
+ if (record instanceof InternalRow) {
+ return delegate.delete((InternalRow) record);
+ }
+ InternalRow pkOnly = convertPojo(record, /*forDelete=*/ true);
+ return delegate.delete(pkOnly);
+ }
+
+ private InternalRow convertPojo(T pojo, boolean forDelete) {
+ final RowType projection;
+ final PojoToRowConverter<T> converter;
+ if (forDelete) {
+ projection = pkProjection;
+ converter = pkConverter;
+ } else if (targetProjection != null && targetConverter != null) {
+ projection = targetProjection;
+ converter = targetConverter;
+ } else {
+ projection = tableSchema;
+ converter = pojoToRowConverter;
+ }
+
+ GenericRow projected = converter.toRow(pojo);
+ if (projection == tableSchema) {
+ return projected;
+ }
+ // expand projected row to full row if needed
+ GenericRow full = new GenericRow(tableSchema.getFieldCount());
+ if (forDelete) {
+ // set PK fields, others null
+ for (String pk : tableInfo.getPhysicalPrimaryKeys()) {
+ int projIndex = projection.getFieldIndex(pk);
+
+ // TODO: this can be optimized by pre-computing
+ // the index mapping in the constructor?
+ int fullIndex = tableSchema.getFieldIndex(pk);
+ full.setField(fullIndex, projected.getField(projIndex));
+ }
+ } else if (targetColumns != null) {
+ for (int i = 0; i < projection.getFieldCount(); i++) {
+ String name = projection.getFieldNames().get(i);
+ int fullIdx = tableSchema.getFieldIndex(name);
+ full.setField(fullIdx, projected.getField(i));
+ }
+ }
+ return full;
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
index d3c3608ee..0843437fe 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
@@ -18,7 +18,6 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.row.InternalRow;
import javax.annotation.Nullable;
@@ -37,16 +36,14 @@ public interface Upsert {
/**
* Apply partial update columns and returns a new Upsert instance.
*
- * <p>For {@link UpsertWriter#upsert(InternalRow)} operation, only the
specified columns will be
- * updated and other columns will remain unchanged if the row exists or
set to null if the row
- * doesn't exist.
+ * <p>For upsert operations, only the specified columns will be updated
and other columns will
+ * remain unchanged if the row exists or set to null if the row doesn't
exist.
*
- * <p>For {@link UpsertWriter#delete(InternalRow)} operation, the entire
row will not be
- * removed, but only the specified columns except primary key will be set
to null. The entire
- * row will be removed when all columns except primary key are null after
a {@link
- * UpsertWriter#delete(InternalRow)} operation.
+ * <p>For delete operations, the entire row will not be removed
immediately, but only the
+ * specified columns except primary key will be set to null. The entire
row will be removed when
+ * all columns except primary key are null after a delete operation.
*
- * <p>Note: The specified columns must be a contains all columns of
primary key, and all columns
+ * <p>Note: The specified columns must contain all columns of primary key,
and all columns
* except primary key should be nullable.
*
* @param targetColumns the column indexes to partial update
@@ -60,8 +57,11 @@ public interface Upsert {
Upsert partialUpdate(String... targetColumnNames);
/**
- * Create a new {@link UpsertWriter} with the optional {@link
#partialUpdate(String...)}
- * information to upsert and delete data to a Primary Key Table.
+ * Create a new {@link UpsertWriter} using {@code InternalRow} with the
optional {@link
+ * #partialUpdate(String...)} information to upsert and delete data to a
Primary Key Table.
*/
UpsertWriter createWriter();
+
+ /** Create a new typed {@link UpsertWriter} to write POJOs directly. */
+ <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
index 6648eebfc..e4d751747 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java
@@ -31,19 +31,18 @@ import java.util.concurrent.CompletableFuture;
public interface UpsertWriter extends TableWriter {
/**
- * Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
+ * Inserts a record into Fluss table if it does not already exist, or
updates it if it does.
*
- * @param row the row to upsert.
+ * @param record the record to upsert.
* @return A {@link CompletableFuture} that always returns upsert result
when complete normally.
*/
- CompletableFuture<UpsertResult> upsert(InternalRow row);
+ CompletableFuture<UpsertResult> upsert(InternalRow record);
/**
- * Delete certain row by the input row in Fluss table, the input row must
contain the primary
- * key.
+ * Delete a certain record from the Fluss table. The input must contain
the primary key fields.
*
- * @param row the row to delete.
+ * @param record the record to delete.
* @return A {@link CompletableFuture} that always delete result when
complete normally.
*/
- CompletableFuture<DeleteResult> delete(InternalRow row);
+ CompletableFuture<DeleteResult> delete(InternalRow record);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index a62c1b70d..8270ed788 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -26,6 +26,7 @@ import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
@@ -144,6 +145,13 @@ public abstract class ClientToServerITCaseBase {
}
}
+ protected static void subscribeFromBeginning(TypedLogScanner<?>
logScanner, Table table) {
+ int bucketCount = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < bucketCount; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+ }
+
protected static void subscribeFromTimestamp(
TablePath tablePath,
@Nullable String partitionName,
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
new file mode 100644
index 000000000..c7724799b
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AllTypesPojo that = (AllTypesPojo) o;
+ return Objects.equals(a, that.a)
+ && Objects.equals(bool1, that.bool1)
+ && Objects.equals(tiny, that.tiny)
+ && Objects.equals(small, that.small)
+ && Objects.equals(intv, that.intv)
+ && Objects.equals(big, that.big)
+ && Objects.equals(flt, that.flt)
+ && Objects.equals(dbl, that.dbl)
+ && Objects.equals(ch, that.ch)
+ && Objects.equals(str, that.str)
+ && Arrays.equals(bin, that.bin)
+ && Arrays.equals(bytes, that.bytes)
+ && Objects.equals(dec, that.dec)
+ && Objects.equals(dt, that.dt)
+ && Objects.equals(tm, that.tm)
+ && Objects.equals(tsNtz, that.tsNtz)
+ && Objects.equals(tsLtz, that.tsLtz);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch,
str, dec, dt, tm, tsNtz,
+ tsLtz);
+ result = 31 * result + Arrays.hashCode(bin);
+ result = 31 * result + Arrays.hashCode(bytes);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "AllTypesPojo{"
+ + "a="
+ + a
+ + ", bool1="
+ + bool1
+ + ", tiny="
+ + tiny
+ + ", small="
+ + small
+ + ", intv="
+ + intv
+ + ", big="
+ + big
+ + ", flt="
+ + flt
+ + ", dbl="
+ + dbl
+ + ", ch="
+ + ch
+ + ", str='"
+ + str
+ + '\''
+ + ", bin="
+ + Arrays.toString(bin)
+ + ", bytes="
+ + Arrays.toString(bytes)
+ + ", dec="
+ + dec
+ + ", dt="
+ + dt
+ + ", tm="
+ + tm
+ + ", tsNtz="
+ + tsNtz
+ + ", tsLtz="
+ + tsLtz
+ + '}';
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PLookupKey that = (PLookupKey) o;
+ return Objects.equals(a, that.a);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(a);
+ }
+
+ @Override
+ public String toString() {
+ return "PLookupKey{" + "a=" + a + '}';
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u);
+ }
+ writer.flush();
+
+ // read
+ Scan scan = table.newScan();
+ TypedLogScanner<AllTypesPojo> scanner =
scan.createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<AllTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+ assertThat(actual)
+ .usingRecursiveFieldByFieldElementComparator()
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ @Test
+ void testTypedUpsertWriteAndScan() throws Exception {
+ // Build all-types PK table schema (PK on 'a')
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "all_types_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert();
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo p1 = newAllTypesPojo(1);
+ AllTypesPojo p2 = newAllTypesPojo(2);
+ writer.upsert(p1).get();
+ writer.upsert(p2).get();
+
+ // update key 1: change a couple of fields
+ AllTypesPojo p1Updated = newAllTypesPojo(1);
+ p1Updated.str = "a1";
+ p1Updated.dec = new java.math.BigDecimal("42.42");
+ writer.upsert(p1Updated).get();
+ writer.flush();
+
+ // scan as POJOs and verify change types and values
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ChangeType> changes = new ArrayList<>();
+ List<AllTypesPojo> values = new ArrayList<>();
+ while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE
1, UPDATE_AFTER 1
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ changes.add(r.getChangeType());
+ values.add(r.getValue());
+ }
+ }
+
+ assertThat(changes)
+ .containsExactlyInAnyOrder(
+ ChangeType.INSERT,
+ ChangeType.INSERT,
+ ChangeType.UPDATE_BEFORE,
+ ChangeType.UPDATE_AFTER);
+ // ensure the last update_after reflects new value
+ int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER);
+ assertThat(values.get(lastIdx)).isEqualTo(p1Updated);
+ }
+ }
+
+ @Test
+ void testTypedLookups() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(1)).get();
+ writer.upsert(newAllTypesPojo(2)).get();
+ writer.flush();
+
+ // primary key lookup using Lookuper API with POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+
+ LookupResult lr = lookuper.lookup(new PLookupKey(1)).get();
+ AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(one).isEqualTo(newAllTypesPojo(1));
+ }
+ }
+
+ @Test
+ void testInternalRowLookup() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_internalrow");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write a couple of rows via POJO writer
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(101)).get();
+ writer.upsert(newAllTypesPojo(202)).get();
+ writer.flush();
+
+ // now perform lookup using the raw InternalRow path to ensure
it's still supported
+ Lookuper lookuper = table.newLookup().createLookuper();
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowType keyProjection =
tableSchema.project(table.getTableInfo().getPrimaryKeys());
+
+ // Build the key row directly using GenericRow to avoid any POJO
conversion
+ GenericRow keyRow = new GenericRow(keyProjection.getFieldCount());
+ keyRow.setField(0, 101); // primary key field 'a'
+
+ LookupResult lr = lookuper.lookup(keyRow).get();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo pojo = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(pojo).isEqualTo(newAllTypesPojo(101));
+ }
+ }
+
+ @Test
+ void testTypedProjections() throws Exception {
+ TablePath path = TablePath.of("pojo_db", "proj_log");
+ TableDescriptor td =
+
TableDescriptor.builder().schema(allTypesLogSchema()).distributedBy(1).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ writer.append(newAllTypesPojo(10)).get();
+ writer.append(newAllTypesPojo(11)).get();
+ writer.flush();
+
+ // Project only a subset of fields
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan()
+ .project(Arrays.asList("a", "str"))
+ .createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ int i = 10;
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ AllTypesPojo u = r.getValue();
+ AllTypesPojo expectedPojo = new AllTypesPojo();
+ expectedPojo.a = i;
+ expectedPojo.str = "s" + i;
+ assertThat(u).isEqualTo(expectedPojo);
+ i++;
+ }
+ }
+ }
+
+ @Test
+ void testTypedPartialUpdates() throws Exception {
+ // Use full PK schema and update a subset of fields
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "pk_partial");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(1, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // 1. initial full row
+ TypedUpsertWriter<AllTypesPojo> fullWriter =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ fullWriter.upsert(newAllTypesPojo(1)).get();
+ fullWriter.flush();
+
+ // 2. partial update: only PK + subset fields
+ Upsert upsert = table.newUpsert().partialUpdate("a", "str", "dec");
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo patch = new AllTypesPojo();
+ patch.a = 1;
+ patch.str = "second";
+ patch.dec = new BigDecimal("99.99");
+ writer.upsert(patch).get();
+ writer.flush();
+
+ // verify via lookup and scan using Lookuper + POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo lookedUp =
+ rowConv.fromRow(lookuper.lookup(new
PLookupKey(1)).get().getSingletonRow());
+ AllTypesPojo expected = newAllTypesPojo(1);
+ expected.str = "second";
+ expected.dec = new BigDecimal("99.99");
+ assertThat(lookedUp).isEqualTo(expected);
+
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+ boolean sawUpdateAfter = false;
+ while (!sawUpdateAfter) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ if (r.getChangeType() == ChangeType.UPDATE_AFTER) {
+ assertThat(r.getValue()).isEqualTo(expected);
+ sawUpdateAfter = true;
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 24f264c92..efebb3278 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -562,6 +562,8 @@ class RecordAccumulatorTest {
Map<TablePath, Long> tableIdByPath = new HashMap<>();
tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
+ Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
+ tableInfoByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
return new Cluster(
aliveTabletServersById,
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),