This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 5d65a20b79fbefaae07955ffc296267dbde39b43 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 01:01:42 2025 +0800 WIP2 --- .../src/main/java/org/apache/fluss/metadata/Schema.java | 14 ++++++-------- .../java/org/apache/fluss/record/KvRecordReadContext.java | 1 + .../java/org/apache/fluss/record/LogRecordReadContext.java | 1 + .../src/main/java/org/apache/fluss/row/ProjectedRow.java | 6 +++--- .../java/org/apache/fluss/row/encode/ValueDecoder.java | 1 + .../java/org/apache/fluss/row/encode/ValueEncoder.java | 1 + .../src/test/java/org/apache/fluss/record/LogTestBase.java | 1 - .../test/java/org/apache/fluss/row/ProjectedRowTest.java | 1 + .../org/apache/fluss/flink/row/FlinkAsFlussRowTest.java | 3 +-- 9 files changed, 15 insertions(+), 14 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index c065a3711..0ae0c8b97 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -155,6 +155,7 @@ public final class Schema implements Serializable { return keyIndexes; } + /** Returns the highest field ID in this schema. */ public int getHighestFieldId() { return highestFieldId; } @@ -250,24 +251,21 @@ public final class Schema implements Serializable { /** Adopts all columns from the given list. */ public Builder fromColumns(List<Column> inputColumns) { - boolean nonMatchColumnId = + boolean nonSetColumnId = inputColumns.stream() .noneMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); - boolean allMatchColumnId = + boolean allSetColumnId = inputColumns.stream() .allMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); checkState( - nonMatchColumnId || allMatchColumnId, + nonSetColumnId || allSetColumnId, "All columns must have columnId or none of them must have columnId."); - if (allMatchColumnId) { + if (allSetColumnId) { columns.addAll(inputColumns); highestFieldId = new AtomicInteger( - inputColumns.stream() - .mapToInt(Column::getColumnId) - .max() - .orElse(-1)); + columns.stream().mapToInt(Column::getColumnId).max().orElse(-1)); } else { // if all columnId is not set, this maybe from old version schema. Just use its // position as columnId. diff --git a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java index 5fffc3f4f..195312cef 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java @@ -31,6 +31,7 @@ public class KvRecordReadContext implements KvRecordBatch.ReadContext { private final KvFormat kvFormat; private final SchemaGetter schemaGetter; + // TODO reuse private final Map<Integer, RowDecoder> rowDecoderCache; private KvRecordReadContext(KvFormat kvFormat, SchemaGetter schemaGetter) { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 571e75221..5ec5c7c77 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -244,6 +244,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo if (isSameRowType(schemaId)) { return null; } + // TODO: should we cache the projection? Schema originSchema = schemaGetter.getSchema(schemaId); Schema expectedSchema = schemaGetter.getSchema(targetSchemaId); return ProjectedRow.from(originSchema, expectedSchema); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java index 6b4c516d1..d793f342a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java @@ -36,11 +36,11 @@ import static org.apache.fluss.utils.SchemaUtil.getIndexMapping; public class ProjectedRow implements InternalRow { public static final int UNEXIST_MAPPING = -1; - protected final int[] indexMapping; + private final int[] indexMapping; - protected InternalRow row; + private InternalRow row; - protected ProjectedRow(int[] indexMapping) { + private ProjectedRow(int[] indexMapping) { this.indexMapping = indexMapping; } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java index e0252ae11..5f90ddaef 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java @@ -36,6 +36,7 @@ import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap; */ public class ValueDecoder { + // TODO: reuse? private final Map<Short, RowDecoder> rowDecoders; private final SchemaGetter schemaGetter; private final KvFormat kvFormat; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java index 415fac0c2..74eb10094 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java @@ -42,6 +42,7 @@ public class ValueEncoder { return values; } + // TODO: ??? public static byte[] encodeRow( short schemaId, KvFormat kvFormat, RowType currentRowType, InternalRow row) throws Exception { diff --git a/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java b/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java index 108ab2acf..36aef5b9d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java @@ -72,7 +72,6 @@ public abstract class LogTestBase { assertLogRecordsListEquals(expected, actual, baseRowType); } - // todo: 在这个基础上可以加更多最简单的schema evolution测试,非常好 protected void assertIndexedLogRecordBatchAndRowEquals( LogRecordBatch actual, LogRecordBatch expected, diff --git a/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java index 09467d31a..80b21a719 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java @@ -183,6 +183,7 @@ class ProjectedRowTest { .isExactlyInstanceOf(SchemaChangeException.class) .hasMessage( "Expected datatype of column(id=0,name=a) is [INT], while the actual datatype is [BIGINT]"); + assertThatThrownBy( () -> ProjectedRow.from( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java index 1f147751a..73ba044d8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java @@ -31,7 +31,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Instant; -import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.offset; @@ -60,7 +59,7 @@ class FlinkAsFlussRowTest { TimestampData.fromEpochMillis(1672531200000L, 3), new byte[] {1, 2, 3}, null); - row = new FlinkAsFlussRow(IntStream.range(0, 14).toArray()).replace(flinkRow); + row = new FlinkAsFlussRow().replace(flinkRow); } @Test
