wuchong commented on code in PR #1992:
URL: https://github.com/apache/fluss/pull/1992#discussion_r2642891636
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java:
##########
@@ -95,4 +95,11 @@ public Upsert partialUpdate(String... targetColumnNames) {
public UpsertWriter createWriter() {
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns,
writerClient);
}
+
+ @Override
+ public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
+ UpsertWriterImpl delegate =
+ new UpsertWriterImpl(tablePath, tableInfo, targetColumns,
writerClient);
Review Comment:
Can simplify to just call `createWriter()`.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java:
##########
@@ -38,4 +38,10 @@ public TableAppend(TablePath tablePath, TableInfo tableInfo,
WriterClient writer
public AppendWriter createWriter() {
return new AppendWriterImpl(tablePath, tableInfo, writerClient);
}
+
+ @Override
+ public <T> TypedAppendWriter<T> createTypedWriter(Class<T> pojoClass) {
+ AppendWriterImpl delegate = new AppendWriterImpl(tablePath, tableInfo,
writerClient);
Review Comment:
Can simplify to just call `createWriter()`.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
+
+ @Override
+ public void flush() {
+ delegate.flush();
+ }
+
+ private final UpsertWriterImpl delegate;
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
+ private final Class<T> pojoClass;
Review Comment:
Not used, can be removed.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u).get();
+ }
+ writer.flush();
+
+ // read
+ Scan scan = table.newScan();
+ TypedLogScanner<AllTypesPojo> scanner =
scan.createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<AllTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+ assertThat(actual)
+ .usingRecursiveFieldByFieldElementComparator()
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ @Test
+ void testTypedUpsertWriteAndScan() throws Exception {
+ // Build all-types PK table schema (PK on 'a')
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "all_types_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert();
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo p1 = newAllTypesPojo(1);
+ AllTypesPojo p2 = newAllTypesPojo(2);
+ writer.upsert(p1).get();
+ writer.upsert(p2).get();
+
+ // update key 1: change a couple of fields
+ AllTypesPojo p1Updated = newAllTypesPojo(1);
+ p1Updated.str = "a1";
+ p1Updated.dec = new java.math.BigDecimal("42.42");
+ writer.upsert(p1Updated).get();
+ writer.flush();
+
+ // scan as POJOs and verify change types and values
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ChangeType> changes = new ArrayList<>();
+ List<AllTypesPojo> values = new ArrayList<>();
+ while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE
1, UPDATE_AFTER 1
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ changes.add(r.getChangeType());
+ values.add(r.getValue());
+ }
+ }
+ assertThat(changes)
+ .contains(ChangeType.INSERT, ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER);
+ // ensure the last update_after reflects new value
+ int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER);
+ assertThat(values.get(lastIdx).str).isEqualTo("a1");
+ }
+ }
+
+ @Test
+ void testTypedLookups() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(1)).get();
+ writer.upsert(newAllTypesPojo(2)).get();
+ writer.close();
+
+ // primary key lookup using Lookuper API with POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+
+ LookupResult lr = lookuper.lookup(new PLookupKey(1)).get();
+ AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(one.str).isEqualTo("s1");
+ }
+ }
+
+ @Test
+ void testInternalRowLookup() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_internalrow");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write a couple of rows via POJO writer
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(101)).get();
+ writer.upsert(newAllTypesPojo(202)).get();
+ writer.close();
+
+ // now perform lookup using the raw InternalRow path to ensure
it's still supported
+ Lookuper lookuper = table.newLookup().createLookuper();
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowType keyProjection =
tableSchema.project(table.getTableInfo().getPrimaryKeys());
+
+ // Build the key row directly using GenericRow to avoid any POJO
conversion
+ GenericRow keyRow = new GenericRow(keyProjection.getFieldCount());
+ keyRow.setField(0, 101); // primary key field 'a'
+
+ LookupResult lr = lookuper.lookup(keyRow).get();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo pojo = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(pojo).isNotNull();
+ assertThat(pojo.a).isEqualTo(101);
+ assertThat(pojo.str).isEqualTo("s101");
+ }
+ }
+
+ @Test
+ void testTypedProjections() throws Exception {
+ TablePath path = TablePath.of("pojo_db", "proj_log");
+ TableDescriptor td =
+
TableDescriptor.builder().schema(allTypesLogSchema()).distributedBy(1).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ writer.append(newAllTypesPojo(10)).get();
+ writer.append(newAllTypesPojo(11)).get();
+ writer.flush();
+
+ // Project only a subset of fields
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan()
+ .project(Arrays.asList("a", "str"))
+ .createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ AllTypesPojo u = r.getValue();
+ assertThat(u.a).isNotNull();
+ assertThat(u.str).isNotNull();
+ // non-projected fields should be null
+ assertThat(u.bool1).isNull();
+ assertThat(u.bin).isNull();
+ assertThat(u.bytes).isNull();
+ assertThat(u.dec).isNull();
+ assertThat(u.dt).isNull();
+ assertThat(u.tm).isNull();
+ assertThat(u.tsNtz).isNull();
+ assertThat(u.tsLtz).isNull();
+ }
+ }
+ }
+
+ @Test
+ void testTypedPartialUpdates() throws Exception {
+ // Use full PK schema and update a subset of fields
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "pk_partial");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(1, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert().partialUpdate("a", "str", "dec");
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ // initial full row
+ writer.upsert(newAllTypesPojo(1)).get();
+
+ // partial update: only PK + subset fields
+ AllTypesPojo patch = new AllTypesPojo();
+ patch.a = 1;
+ patch.str = "second";
+ patch.dec = new BigDecimal("99.99");
+ writer.upsert(patch).get();
+ writer.close();
+
+ // verify via lookup and scan using Lookuper + POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo lookedUp =
+ rowConv.fromRow(lookuper.lookup(new
PLookupKey(1)).get().getSingletonRow());
+ assertThat(lookedUp.str).isEqualTo("second");
+ assertThat(lookedUp.dec).isEqualByComparingTo("99.99");
+
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+ boolean sawUpdateAfter = false;
+ while (!sawUpdateAfter) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ if (r.getChangeType() == ChangeType.UPDATE_AFTER) {
+ assertThat(r.getValue().str).isEqualTo("second");
Review Comment:
ditto
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
+
+ @Override
+ public void flush() {
+ delegate.flush();
+ }
+
+ private final UpsertWriterImpl delegate;
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
+ private final Class<T> pojoClass;
+ private final TableInfo tableInfo;
+ private final RowType tableSchema;
+ private final int[] targetColumns; // may be null
Review Comment:
We can add `@Nullable` annotation to indicate it is nullable
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u).get();
+ }
+ writer.flush();
+
+ // read
+ Scan scan = table.newScan();
+ TypedLogScanner<AllTypesPojo> scanner =
scan.createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<AllTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+ assertThat(actual)
+ .usingRecursiveFieldByFieldElementComparator()
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ @Test
+ void testTypedUpsertWriteAndScan() throws Exception {
+ // Build all-types PK table schema (PK on 'a')
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "all_types_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert();
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo p1 = newAllTypesPojo(1);
+ AllTypesPojo p2 = newAllTypesPojo(2);
+ writer.upsert(p1).get();
+ writer.upsert(p2).get();
+
+ // update key 1: change a couple of fields
+ AllTypesPojo p1Updated = newAllTypesPojo(1);
+ p1Updated.str = "a1";
+ p1Updated.dec = new java.math.BigDecimal("42.42");
+ writer.upsert(p1Updated).get();
+ writer.flush();
+
+ // scan as POJOs and verify change types and values
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ChangeType> changes = new ArrayList<>();
+ List<AllTypesPojo> values = new ArrayList<>();
+ while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE
1, UPDATE_AFTER 1
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ changes.add(r.getChangeType());
+ values.add(r.getValue());
+ }
+ }
+ assertThat(changes)
+ .contains(ChangeType.INSERT, ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER);
Review Comment:
```suggestion
assertThat(changes)
.containsExactlyInAnyOrder(
ChangeType.INSERT,
ChangeType.INSERT,
ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER);
```
use `containsExactlyInAnyOrder` to verify all the changes
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link AppendWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedAppendWriterImpl<T> implements TypedAppendWriter<T> {
+
+ private final AppendWriterImpl delegate;
+ private final Class<T> pojoClass;
+ private final RowType tableSchema;
Review Comment:
not used, can be removed.
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.writer;
+
+import org.apache.fluss.client.converter.PojoToRowConverter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and
delegates to the
+ * existing internal-row based writer implementation.
+ */
+class TypedUpsertWriterImpl<T> implements TypedUpsertWriter<T> {
+
+ @Override
+ public void flush() {
+ delegate.flush();
+ }
+
+ private final UpsertWriterImpl delegate;
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
+ private final Class<T> pojoClass;
+ private final TableInfo tableInfo;
+ private final RowType tableSchema;
+ private final int[] targetColumns; // may be null
+
+ private final RowType pkProjection;
+ @Nullable private final RowType targetProjection;
+
+ private final PojoToRowConverter<T> pojoToRowConverter;
+ private final PojoToRowConverter<T> pkConverter;
+ @Nullable private final PojoToRowConverter<T> targetConverter;
+
+ TypedUpsertWriterImpl(
+ UpsertWriterImpl delegate,
+ Class<T> pojoClass,
+ TableInfo tableInfo,
+ int[] targetColumns) {
+ this.delegate = delegate;
+ this.pojoClass = pojoClass;
+ this.tableInfo = tableInfo;
+ this.tableSchema = tableInfo.getRowType();
+ this.targetColumns = targetColumns;
+
+ // Precompute projections
+ this.pkProjection =
this.tableSchema.project(tableInfo.getPhysicalPrimaryKeys());
+ this.targetProjection =
+ (targetColumns == null) ? null :
this.tableSchema.project(targetColumns);
+
+ // Initialize reusable converters
+ this.pojoToRowConverter = PojoToRowConverter.of(pojoClass,
tableSchema, tableSchema);
+ this.pkConverter = PojoToRowConverter.of(pojoClass, tableSchema,
pkProjection);
+ this.targetConverter =
+ (targetProjection == null)
+ ? null
+ : PojoToRowConverter.of(pojoClass, tableSchema,
targetProjection);
+ }
+
+ @Override
+ public CompletableFuture<UpsertResult> upsert(T record) {
+ if (record instanceof InternalRow) {
+ return delegate.upsert((InternalRow) record);
+ }
+ InternalRow row = convertPojo(record, /*forDelete=*/ false);
+ return delegate.upsert(row);
+ }
+
+ @Override
+ public CompletableFuture<DeleteResult> delete(T record) {
+ if (record instanceof InternalRow) {
+ return delegate.delete((InternalRow) record);
+ }
+ InternalRow pkOnly = convertPojo(record, /*forDelete=*/ true);
+ return delegate.delete(pkOnly);
+ }
+
+ private InternalRow convertPojo(T pojo, boolean forDelete) {
+ final RowType projection;
+ final PojoToRowConverter<T> converter;
+ if (forDelete) {
+ projection = pkProjection;
+ converter = pkConverter;
+ } else if (targetProjection != null && targetConverter != null) {
+ projection = targetProjection;
+ converter = targetConverter;
+ } else {
+ projection = tableSchema;
+ converter = pojoToRowConverter;
+ }
+
+ GenericRow projected = converter.toRow(pojo);
+ if (projection == tableSchema) {
+ return projected;
+ }
+ // expand projected row to full row if needed
+ GenericRow full = new GenericRow(tableSchema.getFieldCount());
+ if (forDelete) {
+ // set PK fields, others null
+ for (String pk : tableInfo.getPhysicalPrimaryKeys()) {
+ int projIndex = projection.getFieldIndex(pk);
+ int fullIndex = tableSchema.getFieldIndex(pk);
Review Comment:
Could you please add a `TODO` comment above this code block noting that it
can be optimized by pre-computing the index mapping in the constructor?
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java:
##########
@@ -37,7 +37,9 @@
*/
@PublicEvolving
public class ScanRecords implements Iterable<ScanRecord> {
- public static final ScanRecords EMPTY = new
ScanRecords(Collections.emptyMap());
+ public static final ScanRecords empty() {
+ return new ScanRecords(Collections.emptyMap());
+ }
Review Comment:
Not necessary? I still prefer the preivous implementation because it can
avoid some small object overhead (GC).
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u).get();
+ }
+ writer.flush();
+
+ // read
+ Scan scan = table.newScan();
+ TypedLogScanner<AllTypesPojo> scanner =
scan.createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<AllTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+ assertThat(actual)
+ .usingRecursiveFieldByFieldElementComparator()
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ @Test
+ void testTypedUpsertWriteAndScan() throws Exception {
+ // Build all-types PK table schema (PK on 'a')
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "all_types_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert();
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo p1 = newAllTypesPojo(1);
+ AllTypesPojo p2 = newAllTypesPojo(2);
+ writer.upsert(p1).get();
+ writer.upsert(p2).get();
+
+ // update key 1: change a couple of fields
+ AllTypesPojo p1Updated = newAllTypesPojo(1);
+ p1Updated.str = "a1";
+ p1Updated.dec = new java.math.BigDecimal("42.42");
+ writer.upsert(p1Updated).get();
+ writer.flush();
+
+ // scan as POJOs and verify change types and values
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ChangeType> changes = new ArrayList<>();
+ List<AllTypesPojo> values = new ArrayList<>();
+ while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE
1, UPDATE_AFTER 1
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ changes.add(r.getChangeType());
+ values.add(r.getValue());
+ }
+ }
+ assertThat(changes)
+ .contains(ChangeType.INSERT, ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER);
+ // ensure the last update_after reflects new value
+ int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER);
+ assertThat(values.get(lastIdx).str).isEqualTo("a1");
+ }
+ }
+
+ @Test
+ void testTypedLookups() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(1)).get();
+ writer.upsert(newAllTypesPojo(2)).get();
+ writer.close();
+
+ // primary key lookup using Lookuper API with POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+
+ LookupResult lr = lookuper.lookup(new PLookupKey(1)).get();
+ AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(one.str).isEqualTo("s1");
Review Comment:
```
assertThat(one).isEqualTo(newAllTypesPojo(1));
```
After adding `equals` and `hashcode` method to the `AllTypesPojo` class, we
can simply assert the full record, this can check the full POJO record
deserialization.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u).get();
Review Comment:
```suggestion
writer.append(u);
```
use async flush to reduce test time
##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableWriter.java:
##########
@@ -38,4 +38,9 @@ public interface TableWriter {
* results in an error.
*/
void flush();
+
+ @Override
+ default void close() throws Exception {
+ // by default do nothing
+ }
Review Comment:
It seems the newly introduced `close()` method doesn’t currently have any
meaningful work to perform, as the writers don’t hold any resources at this
stage. I suggest holding off on introducing it for now.
Typically, a `close()` method should either **flush pending records** or
**ensure all previously submitted requests have completed**—otherwise, it may
mislead users into expecting cleanup or finalization behavior that isn’t
actually implemented. This introduces some complex to this PR.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.converter.RowToPojoConverter;
+import org.apache.fluss.client.lookup.LookupResult;
+import org.apache.fluss.client.lookup.Lookuper;
+import org.apache.fluss.client.lookup.TypedLookuper;
+import org.apache.fluss.client.table.scanner.Scan;
+import org.apache.fluss.client.table.scanner.TypedScanRecord;
+import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
+import org.apache.fluss.client.table.scanner.log.TypedScanRecords;
+import org.apache.fluss.client.table.writer.TypedAppendWriter;
+import org.apache.fluss.client.table.writer.TypedUpsertWriter;
+import org.apache.fluss.client.table.writer.Upsert;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for writing and scanning POJOs via client API. */
+public class FlussTypedClientITCase extends ClientToServerITCaseBase {
+
+ /** Test POJO containing all supported field types used by converters. */
+ public static class AllTypesPojo {
+ // primary key
+ public Integer a;
+ // all supported converter fields
+ public Boolean bool1;
+ public Byte tiny;
+ public Short small;
+ public Integer intv;
+ public Long big;
+ public Float flt;
+ public Double dbl;
+ public Character ch;
+ public String str;
+ public byte[] bin;
+ public byte[] bytes;
+ public BigDecimal dec;
+ public LocalDate dt;
+ public LocalTime tm;
+ public LocalDateTime tsNtz;
+ public Instant tsLtz;
+
+ public AllTypesPojo() {}
+
+ public AllTypesPojo(
+ Integer a,
+ Boolean bool1,
+ Byte tiny,
+ Short small,
+ Integer intv,
+ Long big,
+ Float flt,
+ Double dbl,
+ Character ch,
+ String str,
+ byte[] bin,
+ byte[] bytes,
+ BigDecimal dec,
+ LocalDate dt,
+ LocalTime tm,
+ LocalDateTime tsNtz,
+ Instant tsLtz) {
+ this.a = a;
+ this.bool1 = bool1;
+ this.tiny = tiny;
+ this.small = small;
+ this.intv = intv;
+ this.big = big;
+ this.flt = flt;
+ this.dbl = dbl;
+ this.ch = ch;
+ this.str = str;
+ this.bin = bin;
+ this.bytes = bytes;
+ this.dec = dec;
+ this.dt = dt;
+ this.tm = tm;
+ this.tsNtz = tsNtz;
+ this.tsLtz = tsLtz;
+ }
+ }
+
+ /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */
+ public static class PLookupKey {
+ public Integer a;
+
+ public PLookupKey() {}
+
+ public PLookupKey(Integer a) {
+ this.a = a;
+ }
+ }
+
+ private static Schema allTypesLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ }
+
+ private static Schema allTypesPkSchema() {
+ // Same columns as log schema but with PK on 'a'
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("a")
+ .build();
+ }
+
+ private static AllTypesPojo newAllTypesPojo(int i) {
+ Integer a = i;
+ Boolean bool1 = (i % 2) == 0;
+ Byte tiny = (byte) (i - 5);
+ Short small = (short) (100 + i);
+ Integer intv = 1000 + i;
+ Long big = 100000L + i;
+ Float flt = 1.5f + i;
+ Double dbl = 2.5 + i;
+ Character ch = (char) ('a' + (i % 26));
+ String str = "s" + i;
+ byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)};
+ byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)};
+ BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2,
RoundingMode.HALF_UP);
+ LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i);
+ LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60);
+ LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0,
0).plusSeconds(i).withNano(0);
+ Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L));
+ return new AllTypesPojo(
+ a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin,
bytes, dec, dt, tm, tsNtz,
+ tsLtz);
+ }
+
+ @Test
+ void testTypedAppendWriteAndScan() throws Exception {
+ // Build all-types log table schema
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("bool1", DataTypes.BOOLEAN())
+ .column("tiny", DataTypes.TINYINT())
+ .column("small", DataTypes.SMALLINT())
+ .column("intv", DataTypes.INT())
+ .column("big", DataTypes.BIGINT())
+ .column("flt", DataTypes.FLOAT())
+ .column("dbl", DataTypes.DOUBLE())
+ .column("ch", DataTypes.CHAR(1))
+ .column("str", DataTypes.STRING())
+ .column("bin", DataTypes.BINARY(3))
+ .column("bytes", DataTypes.BYTES())
+ .column("dec", DataTypes.DECIMAL(10, 2))
+ .column("dt", DataTypes.DATE())
+ .column("tm", DataTypes.TIME())
+ .column("tsNtz", DataTypes.TIMESTAMP(3))
+ .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ TablePath path = TablePath.of("pojo_db", "all_types_log");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ List<AllTypesPojo> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ AllTypesPojo u = newAllTypesPojo(i);
+ expected.add(u);
+ writer.append(u).get();
+ }
+ writer.flush();
+
+ // read
+ Scan scan = table.newScan();
+ TypedLogScanner<AllTypesPojo> scanner =
scan.createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<AllTypesPojo> actual = new ArrayList<>();
+ while (actual.size() < expected.size()) {
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+
assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ actual.add(r.getValue());
+ }
+ }
+ assertThat(actual)
+ .usingRecursiveFieldByFieldElementComparator()
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ @Test
+ void testTypedUpsertWriteAndScan() throws Exception {
+ // Build all-types PK table schema (PK on 'a')
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "all_types_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert();
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ AllTypesPojo p1 = newAllTypesPojo(1);
+ AllTypesPojo p2 = newAllTypesPojo(2);
+ writer.upsert(p1).get();
+ writer.upsert(p2).get();
+
+ // update key 1: change a couple of fields
+ AllTypesPojo p1Updated = newAllTypesPojo(1);
+ p1Updated.str = "a1";
+ p1Updated.dec = new java.math.BigDecimal("42.42");
+ writer.upsert(p1Updated).get();
+ writer.flush();
+
+ // scan as POJOs and verify change types and values
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan().createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+
+ List<ChangeType> changes = new ArrayList<>();
+ List<AllTypesPojo> values = new ArrayList<>();
+ while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE
1, UPDATE_AFTER 1
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ changes.add(r.getChangeType());
+ values.add(r.getValue());
+ }
+ }
+ assertThat(changes)
+ .contains(ChangeType.INSERT, ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER);
+ // ensure the last update_after reflects new value
+ int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER);
+ assertThat(values.get(lastIdx).str).isEqualTo("a1");
+ }
+ }
+
+ @Test
+ void testTypedLookups() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_pk");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(1)).get();
+ writer.upsert(newAllTypesPojo(2)).get();
+ writer.close();
+
+ // primary key lookup using Lookuper API with POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+
+ LookupResult lr = lookuper.lookup(new PLookupKey(1)).get();
+ AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(one.str).isEqualTo("s1");
+ }
+ }
+
+ @Test
+ void testInternalRowLookup() throws Exception {
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "lookup_internalrow");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(2, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ // write a couple of rows via POJO writer
+ TypedUpsertWriter<AllTypesPojo> writer =
+ table.newUpsert().createTypedWriter(AllTypesPojo.class);
+ writer.upsert(newAllTypesPojo(101)).get();
+ writer.upsert(newAllTypesPojo(202)).get();
+ writer.close();
+
+ // now perform lookup using the raw InternalRow path to ensure
it's still supported
+ Lookuper lookuper = table.newLookup().createLookuper();
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowType keyProjection =
tableSchema.project(table.getTableInfo().getPrimaryKeys());
+
+ // Build the key row directly using GenericRow to avoid any POJO
conversion
+ GenericRow keyRow = new GenericRow(keyProjection.getFieldCount());
+ keyRow.setField(0, 101); // primary key field 'a'
+
+ LookupResult lr = lookuper.lookup(keyRow).get();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo pojo = rowConv.fromRow(lr.getSingletonRow());
+ assertThat(pojo).isNotNull();
+ assertThat(pojo.a).isEqualTo(101);
+ assertThat(pojo.str).isEqualTo("s101");
+ }
+ }
+
+ @Test
+ void testTypedProjections() throws Exception {
+ TablePath path = TablePath.of("pojo_db", "proj_log");
+ TableDescriptor td =
+
TableDescriptor.builder().schema(allTypesLogSchema()).distributedBy(1).build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ TypedAppendWriter<AllTypesPojo> writer =
+ table.newAppend().createTypedWriter(AllTypesPojo.class);
+ writer.append(newAllTypesPojo(10)).get();
+ writer.append(newAllTypesPojo(11)).get();
+ writer.flush();
+
+ // Project only a subset of fields
+ TypedLogScanner<AllTypesPojo> scanner =
+ table.newScan()
+ .project(Arrays.asList("a", "str"))
+ .createTypedLogScanner(AllTypesPojo.class);
+ subscribeFromBeginning(scanner, table);
+ TypedScanRecords<AllTypesPojo> recs =
scanner.poll(Duration.ofSeconds(2));
+ for (TypedScanRecord<AllTypesPojo> r : recs) {
+ AllTypesPojo u = r.getValue();
+ assertThat(u.a).isNotNull();
+ assertThat(u.str).isNotNull();
+ // non-projected fields should be null
+ assertThat(u.bool1).isNull();
+ assertThat(u.bin).isNull();
+ assertThat(u.bytes).isNull();
+ assertThat(u.dec).isNull();
+ assertThat(u.dt).isNull();
+ assertThat(u.tm).isNull();
+ assertThat(u.tsNtz).isNull();
+ assertThat(u.tsLtz).isNull();
+ }
+ }
+ }
+
+ @Test
+ void testTypedPartialUpdates() throws Exception {
+ // Use full PK schema and update a subset of fields
+ Schema schema = allTypesPkSchema();
+ TablePath path = TablePath.of("pojo_db", "pk_partial");
+ TableDescriptor td =
TableDescriptor.builder().schema(schema).distributedBy(1, "a").build();
+ createTable(path, td, true);
+
+ try (Table table = conn.getTable(path)) {
+ Upsert upsert = table.newUpsert().partialUpdate("a", "str", "dec");
+ TypedUpsertWriter<AllTypesPojo> writer =
upsert.createTypedWriter(AllTypesPojo.class);
+
+ // initial full row
+ writer.upsert(newAllTypesPojo(1)).get();
+
+ // partial update: only PK + subset fields
+ AllTypesPojo patch = new AllTypesPojo();
+ patch.a = 1;
+ patch.str = "second";
+ patch.dec = new BigDecimal("99.99");
+ writer.upsert(patch).get();
+ writer.close();
+
+ // verify via lookup and scan using Lookuper + POJO key
+ TypedLookuper<PLookupKey> lookuper =
+ table.newLookup().createTypedLookuper(PLookupKey.class);
+ RowType tableSchema = table.getTableInfo().getRowType();
+ RowToPojoConverter<AllTypesPojo> rowConv =
+ RowToPojoConverter.of(AllTypesPojo.class, tableSchema,
tableSchema);
+ AllTypesPojo lookedUp =
+ rowConv.fromRow(lookuper.lookup(new
PLookupKey(1)).get().getSingletonRow());
+ assertThat(lookedUp.str).isEqualTo("second");
+ assertThat(lookedUp.dec).isEqualByComparingTo("99.99");
Review Comment:
In order to test the partial update feature, we should assert the other
fields are keep unchanged here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]