This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 256bd7f38 [common] Add field_id for Nested Row. (#2322)
256bd7f38 is described below
commit 256bd7f3838a72965720eb2a8c6c7c0575f46b07
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jan 8 19:03:23 2026 +0800
[common] Add field_id for Nested Row. (#2322)
---
.../fluss/client/table/scanner/log/LogFetcher.java | 2 +-
.../client/table/scanner/log/LogScannerImpl.java | 2 +-
.../fluss/client/admin/FlussAdminITCase.java | 35 ++--
.../scanner/log/DefaultCompletedFetchTest.java | 6 +-
.../scanner/log/RemoteCompletedFetchTest.java | 10 +-
.../java/org/apache/fluss/metadata/Schema.java | 228 ++++++++++++++++-----
.../org/apache/fluss/record/FileLogProjection.java | 49 +----
.../apache/fluss/record/LogRecordReadContext.java | 22 +-
.../fluss/record/ProjectionPushdownCache.java | 5 +-
.../java/org/apache/fluss/types/DataField.java | 24 ++-
.../org/apache/fluss/types/DataTypeChecks.java | 39 ++++
.../java/org/apache/fluss/types/DataTypes.java | 27 ++-
.../org/apache/fluss/types/ReassignFieldId.java | 74 +++++++
.../main/java/org/apache/fluss/types/RowType.java | 5 +
.../java/org/apache/fluss/utils/Projection.java | 84 +++-----
.../apache/fluss/utils/json/DataTypeJsonSerde.java | 22 +-
.../org/apache/fluss/metadata/TableSchemaTest.java | 137 +++++++++++++
.../apache/fluss/record/FileLogProjectionTest.java | 51 ++++-
.../java/org/apache/fluss/types/DataTypesTest.java | 12 +-
.../org/apache/fluss/utils/ProjectionTest.java | 26 +--
.../fluss/utils/json/ColumnJsonSerdeTest.java | 17 +-
.../fluss/utils/json/DataTypeJsonSerdeTest.java | 43 +++-
.../fluss/utils/json/SchemaJsonSerdeTest.java | 38 +++-
.../apache/fluss/flink/utils/FlinkConversions.java | 20 +-
.../fluss/flink/sink/FlinkComplexTypeITCase.java | 75 +++++++
.../fluss/flink/utils/FlinkConversionsTest.java | 67 +++++-
.../fluss/server/coordinator/SchemaUpdate.java | 11 +-
.../server/coordinator/TableManagerITCase.java | 28 ++-
.../fluss/server/tablet/TabletServiceITCase.java | 2 +-
29 files changed, 904 insertions(+), 257 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index a97bffac0..b5a03a1ca 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -530,7 +530,7 @@ public class LogFetcher implements Closeable {
assert projection != null;
reqForTable
.setProjectionPushdownEnabled(true)
-
.setProjectedFields(projection.getProjectionIdInOrder());
+
.setProjectedFields(projection.getProjectionInOrder());
} else {
reqForTable.setProjectionPushdownEnabled(false);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index b50bc3622..ebcae05c5 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -123,7 +123,7 @@ public class LogScannerImpl implements LogScanner {
+ tableRowType);
}
}
- return Projection.of(projectedFields, tableInfo.getSchema());
+ return Projection.of(projectedFields);
} else {
return null;
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index a7fb7e6ac..aaf06592e 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -71,6 +71,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.ServerTags;
+import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;
import org.junit.jupiter.api.BeforeEach;
@@ -394,7 +395,12 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
admin.alterTable(
tablePath,
- Collections.singletonList(
+ Arrays.asList(
+ TableChange.addColumn(
+ "nested_row",
+ DataTypes.ROW(DataTypes.STRING(),
DataTypes.INT()),
+ "new nested column",
+ TableChange.ColumnPosition.last()),
TableChange.addColumn(
"c1",
DataTypes.STRING(),
@@ -408,23 +414,28 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.primaryKey("id")
.fromColumns(
Arrays.asList(
+ new Schema.Column("id",
DataTypes.INT(), "person id", 0),
new Schema.Column(
- "id", DataTypes.INT(), "person
id", (short) 0),
+ "name", DataTypes.STRING(),
"person name", 1),
+ new Schema.Column("age",
DataTypes.INT(), "person age", 2),
new Schema.Column(
- "name",
- DataTypes.STRING(),
- "person name",
- (short) 1),
+ "nested_row",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "f0",
DataTypes.STRING(), 4),
+ DataTypes.FIELD("f1",
DataTypes.INT(), 5)),
+ "new nested column",
+ 3),
new Schema.Column(
- "age", DataTypes.INT(),
"person age", (short) 2),
- new Schema.Column(
- "c1",
- DataTypes.STRING(),
- "new column c1",
- (short) 3)))
+ "c1", DataTypes.STRING(), "new
column c1", 6)))
.build();
SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2));
+ // test field_id of rowType
+ assertThat(
+ DataTypeChecks.equalsWithFieldId(
+ schemaInfo.getSchema().getRowType(),
expectedSchema.getRowType()))
+ .isTrue();
}
@Test
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 81463004c..9f50fe59e 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -174,7 +174,7 @@ public class DefaultCompletedFetchTest {
long fetchOffset = 0L;
int bucketId = 0; // records for 0-10.
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
- Projection projection = Projection.of(new int[] {0, 2}, schema);
+ Projection projection = Projection.of(new int[] {0, 2});
MemoryLogRecords memoryLogRecords;
if (logFormat == LogFormat.ARROW) {
memoryLogRecords = genRecordsWithProjection(DATA2, projection,
magic);
@@ -210,7 +210,7 @@ public class DefaultCompletedFetchTest {
// test projection reorder.
defaultCompletedFetch =
makeCompletedFetch(
- tb, resultForBucket0, fetchOffset, Projection.of(new
int[] {2, 0}, schema));
+ tb, resultForBucket0, fetchOffset, Projection.of(new
int[] {2, 0}));
scanRecords = defaultCompletedFetch.fetchRecords(8);
assertThat(scanRecords.size()).isEqualTo(8);
for (int i = 0; i < scanRecords.size(); i++) {
@@ -427,7 +427,7 @@ public class DefaultCompletedFetchTest {
DATA2_TABLE_ID,
testingSchemaGetter,
DEFAULT_COMPRESSION,
- projection.getProjectionIdInOrder());
+ projection.getProjectionInOrder());
ByteBuffer buffer =
toByteBuffer(
fileLogProjection
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
index edc47a7ec..f3022c91b 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
@@ -226,10 +226,7 @@ class RemoteCompletedFetchTest {
createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH,
DATA2, logFormat);
RemoteCompletedFetch completedFetch =
makeCompletedFetch(
- tableBucket,
- fileLogRecords,
- fetchOffset,
- Projection.of(new int[] {0, 2}, schema));
+ tableBucket, fileLogRecords, fetchOffset,
Projection.of(new int[] {0, 2}));
List<ScanRecord> scanRecords = completedFetch.fetchRecords(8);
List<Object[]> expectedObjects =
@@ -255,10 +252,7 @@ class RemoteCompletedFetchTest {
completedFetch =
makeCompletedFetch(
- tableBucket,
- fileLogRecords,
- fetchOffset,
- Projection.of(new int[] {2, 0}, schema));
+ tableBucket, fileLogRecords, fetchOffset,
Projection.of(new int[] {2, 0}));
scanRecords = completedFetch.fetchRecords(8);
assertThat(scanRecords.size()).isEqualTo(8);
for (int i = 0; i < scanRecords.size(); i++) {
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 63066849b..21a3a3c93 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -19,9 +19,12 @@ package org.apache.fluss.metadata;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.annotation.PublicStable;
+import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.ReassignFieldId;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.EncodingUtils;
import org.apache.fluss.utils.StringUtils;
@@ -75,7 +78,8 @@ public final class Schema implements Serializable {
@Nullable PrimaryKey primaryKey,
int highestFieldId,
List<String> autoIncrementColumnNames) {
- this.columns = normalizeColumns(columns, primaryKey,
autoIncrementColumnNames);
+ this.columns =
+ normalizeColumns(columns, primaryKey,
autoIncrementColumnNames, highestFieldId);
this.primaryKey = primaryKey;
this.autoIncrementColumnNames = autoIncrementColumnNames;
// pre-create the row type as it is the most frequently used part of
the schema
@@ -85,7 +89,9 @@ public final class Schema implements Serializable {
.map(
column ->
new DataField(
- column.getName(),
column.getDataType()))
+ column.getName(),
+ column.getDataType(),
+ column.columnId))
.collect(Collectors.toList()));
this.highestFieldId = highestFieldId;
}
@@ -248,34 +254,32 @@ public final class Schema implements Serializable {
return this;
}
- public Builder highestFieldId(int highestFieldId) {
- this.highestFieldId = new AtomicInteger(highestFieldId);
- return this;
- }
-
- public Builder fromRowType(RowType rowType) {
- checkNotNull(rowType, "rowType must not be null.");
- final List<DataType> fieldDataTypes = rowType.getChildren();
- final List<String> fieldNames = rowType.getFieldNames();
- IntStream.range(0, fieldDataTypes.size())
- .forEach(i -> column(fieldNames.get(i),
fieldDataTypes.get(i)));
- return this;
- }
-
- /** Adopts the given field names and field data types as physical
columns of the schema. */
- public Builder fromFields(
- List<String> fieldNames, List<? extends DataType>
fieldDataTypes) {
- checkNotNull(fieldNames, "Field names must not be null.");
- checkNotNull(fieldDataTypes, "Field data types must not be null.");
- checkArgument(
- fieldNames.size() == fieldDataTypes.size(),
- "Field names and field data types must have the same
length.");
- IntStream.range(0, fieldNames.size())
- .forEach(i -> column(fieldNames.get(i),
fieldDataTypes.get(i)));
- return this;
- }
-
- /** Adopts all columns from the given list. */
+ /**
+ * Adopts all columns from the given list.
+ *
+ * <p>This method directly uses the columns as-is, preserving their
existing column IDs and
+ * all nested field IDs within their data types (e.g., field IDs in
{@link RowType}, {@link
+ * ArrayType}, {@link MapType}). No field ID reassignment will occur.
+ *
+ * <p>This behavior is different from {@link #column(String,
DataType)}, which automatically
+ * assigns new column IDs and reassigns all nested field IDs to ensure
global uniqueness.
+ *
+ * <p>Use this method when:
+ *
+ * <ul>
+ * <li>Loading existing schema from storage where IDs are already
assigned
+ * <li>Preserving schema identity during schema evolution
+ * <li>Reconstructing schema from serialized format
+ * </ul>
+ *
+ * <p>Note: All input columns must either have column IDs set or none
of them should have
+ * column IDs. Mixed states are not allowed.
+ *
+ * @param inputColumns the list of columns to adopt
+ * @return this builder for fluent API
+ * @throws IllegalStateException if columns have inconsistent column
ID states (some set,
+ * some not set)
+ */
public Builder fromColumns(List<Column> inputColumns) {
boolean nonSetColumnId =
inputColumns.stream()
@@ -289,9 +293,9 @@ public final class Schema implements Serializable {
if (allSetColumnId) {
columns.addAll(inputColumns);
+ List<Integer> allFieldIds = collectAllFieldIds(inputColumns);
highestFieldId =
- new AtomicInteger(
-
columns.stream().mapToInt(Column::getColumnId).max().orElse(-1));
+ new
AtomicInteger(allFieldIds.stream().max(Integer::compareTo).orElse(-1));
} else {
// if all columnId is not set, this maybe from old version
schema. Just use its
// position as columnId.
@@ -310,6 +314,56 @@ public final class Schema implements Serializable {
return this;
}
+ public Builder highestFieldId(int highestFieldId) {
+ this.highestFieldId = new AtomicInteger(highestFieldId);
+ return this;
+ }
+
+ /**
+ * Adopts the field names and data types from the given {@link
RowType} as physical columns
+ * of the schema.
+ *
+ * <p>This method internally calls {@link #column(String, DataType)}
for each field, which
+ * means: The original field IDs in the RowType will be ignored and
replaced with new ones.
+ * If you need to preserve existing field IDs, use {@link
#fromColumns(List)} or {@link
+ * #fromSchema(Schema)} instead.
+ *
+ * @param rowType the row type to adopt fields from
+ * @return this builder for fluent API
+ */
+ public Builder fromRowType(RowType rowType) {
+ checkNotNull(rowType, "rowType must not be null.");
+ final List<DataType> fieldDataTypes = rowType.getChildren();
+ final List<String> fieldNames = rowType.getFieldNames();
+ IntStream.range(0, fieldDataTypes.size())
+ .forEach(i -> column(fieldNames.get(i),
fieldDataTypes.get(i)));
+ return this;
+ }
+
+ /**
+ * Adopts the given field names and field data types as physical
columns of the schema.
+ *
+ * <p>This method internally calls {@link #column(String, DataType)}
for each field, which
+ * means: The original field IDs in the RowType will be ignored and
replaced with new ones.
+ * If you need to preserve existing field IDs, use {@link
#fromColumns(List)} or {@link
+ * #fromSchema(Schema)} instead.
+ *
+ * @param fieldNames the list of field names
+ * @param fieldDataTypes the list of field data types
+ * @return this builder for fluent API
+ */
+ public Builder fromFields(
+ List<String> fieldNames, List<? extends DataType>
fieldDataTypes) {
+ checkNotNull(fieldNames, "Field names must not be null.");
+ checkNotNull(fieldDataTypes, "Field data types must not be null.");
+ checkArgument(
+ fieldNames.size() == fieldDataTypes.size(),
+ "Field names and field data types must have the same
length.");
+ IntStream.range(0, fieldNames.size())
+ .forEach(i -> column(fieldNames.get(i),
fieldDataTypes.get(i)));
+ return this;
+ }
+
/**
* Declares a column that is appended to this schema.
*
@@ -317,13 +371,23 @@ public final class Schema implements Serializable {
* and the order of fields in the data. Thus, columns represent the
payload that is read
* from and written to an external system.
*
+ * <p>Note: If the data type contains nested types (e.g., {@link
RowType}, {@link
+ * ArrayType}, {@link MapType}), all nested field IDs will be
automatically reassigned to
+ * ensure global uniqueness. This is essential for schema evolution
support. If you need to
+ * preserve existing field IDs, use {@link #fromColumns(List)} or
{@link
+ * #fromSchema(Schema)} instead.
+ *
* @param columnName column name
+ * @param dataType column data type
+ * @return this builder for fluent API
*/
public Builder column(String columnName, DataType dataType) {
checkNotNull(columnName, "Column name must not be null.");
checkNotNull(dataType, "Data type must not be null.");
- columns.add(
- new Column(columnName, dataType, null,
highestFieldId.incrementAndGet(), null));
+ int id = highestFieldId.incrementAndGet();
+ // Reassign field id especially for nested types.
+ DataType reassignDataType = ReassignFieldId.reassign(dataType,
highestFieldId);
+ columns.add(new Column(columnName, reassignDataType, null, id,
null));
return this;
}
@@ -346,13 +410,10 @@ public final class Schema implements Serializable {
checkNotNull(dataType, "Data type must not be null.");
checkNotNull(aggFunction, "Aggregation function must not be
null.");
- columns.add(
- new Column(
- columnName,
- dataType,
- null,
- highestFieldId.incrementAndGet(),
- aggFunction));
+ int id = highestFieldId.incrementAndGet();
+ // Reassign field id especially for nested types.
+ DataType reassignDataType = ReassignFieldId.reassign(dataType,
highestFieldId);
+ columns.add(new Column(columnName, reassignDataType, null, id,
aggFunction));
return this;
}
@@ -441,17 +502,6 @@ public final class Schema implements Serializable {
/** Returns an instance of an {@link Schema}. */
public Schema build() {
- Integer maximumColumnId =
-
columns.stream().map(Column::getColumnId).max(Integer::compareTo).orElse(0);
-
- checkState(
- columns.isEmpty() || highestFieldId.get() >=
maximumColumnId,
- "Highest field id must be greater than or equal to the
maximum column id.");
-
- checkState(
-
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
- "Column ids must be unique.");
-
return new Schema(columns, primaryKey, highestFieldId.get(),
autoIncrementColumnNames);
}
}
@@ -629,8 +679,10 @@ public final class Schema implements Serializable {
private static List<Column> normalizeColumns(
List<Column> columns,
@Nullable PrimaryKey primaryKey,
- List<String> autoIncrementColumnNames) {
+ List<String> autoIncrementColumnNames,
+ int highestFieldId) {
+ checkFieldIds(columns, highestFieldId);
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
@@ -728,4 +780,74 @@ public final class Schema implements Serializable {
}
return new RowType(keyRowFields);
}
+
+ /**
+ * Validates field IDs in the schema, including both top-level column IDs
and nested field IDs.
+ *
+ * <p>This method performs the following checks:
+ *
+ * <ul>
+ * <li>Ensures all top-level column IDs are unique
+ * <li>Ensures all field IDs (including nested fields in ROW, ARRAY, MAP
types) are globally
+ * unique
+ * <li>Verifies that the highest field ID is greater than or equal to
all existing field IDs
+ * </ul>
+ *
+ * @param columns the list of columns to validate
+ * @param highestFieldId the highest field ID that should be greater than
or equal to all field
+ * IDs
+ * @throws IllegalStateException if any validation fails
+ */
+ private static void checkFieldIds(List<Column> columns, int
highestFieldId) {
+
+ // Collect all field IDs (including nested fields) for validation
+ List<Integer> allFieldIds = collectAllFieldIds(columns);
+
+ // Validate all field IDs (including nested fields) are unique
+ long uniqueFieldIdsCount = allFieldIds.stream().distinct().count();
+ checkState(
+ uniqueFieldIdsCount == allFieldIds.size(),
+ "All field IDs (including nested fields) must be unique. Found
%s unique IDs but expected %s.",
+ uniqueFieldIdsCount,
+ allFieldIds.size());
+
+ // Validate the highest field ID is greater than or equal to all field
IDs
+ Integer maximumFieldId =
allFieldIds.stream().max(Integer::compareTo).orElse(-1);
+ checkState(
+ columns.isEmpty() || highestFieldId >= maximumFieldId,
+ "Highest field ID (%s) must be greater than or equal to the
maximum field ID (%s) including nested fields. Current columns is %s",
+ highestFieldId,
+ maximumFieldId,
+ columns);
+ }
+
+ /**
+ * Recursively collects all field IDs from a data type, including nested
fields in ROW, ARRAY,
+ * and MAP types.
+ */
+ private static List<Integer> collectAllFieldIds(List<Column> columns) {
+ List<Integer> allFieldIds = new ArrayList<>();
+ for (Column column : columns) {
+ allFieldIds.add(column.getColumnId());
+ collectAllFieldIds(column.getDataType(), allFieldIds);
+ }
+ return allFieldIds;
+ }
+
+ private static void collectAllFieldIds(DataType dataType, List<Integer>
fieldIds) {
+ if (dataType instanceof RowType) {
+ RowType rowType = (RowType) dataType;
+ for (DataField field : rowType.getFields()) {
+ fieldIds.add(field.getFieldId());
+ collectAllFieldIds(field.getType(), fieldIds);
+ }
+ } else if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ collectAllFieldIds(arrayType.getElementType(), fieldIds);
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ collectAllFieldIds(mapType.getKeyType(), fieldIds);
+ collectAllFieldIds(mapType.getValueType(), fieldIds);
+ }
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 9f227adb1..4717271c2 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -51,9 +51,7 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import static
org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK;
import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
@@ -101,7 +99,7 @@ public class FileLogProjection {
private SchemaGetter schemaGetter;
private long tableId;
private ArrowCompressionInfo compressionInfo;
- private int[] selectedFieldIds;
+ private int[] selectedFieldPositions;
public FileLogProjection(ProjectionPushdownCache projectionsCache) {
this.projectionsCache = projectionsCache;
@@ -117,11 +115,11 @@ public class FileLogProjection {
long tableId,
SchemaGetter schemaGetter,
ArrowCompressionInfo compressionInfo,
- int[] selectedFieldIds) {
+ int[] selectedFieldPositions) {
this.tableId = tableId;
this.schemaGetter = schemaGetter;
this.compressionInfo = compressionInfo;
- this.selectedFieldIds = selectedFieldIds;
+ this.selectedFieldPositions = selectedFieldPositions;
}
/**
@@ -405,19 +403,17 @@ public class FileLogProjection {
private ProjectionInfo getOrCreateProjectionInfo(short schemaId) {
ProjectionInfo cachedProjection =
- projectionsCache.getProjectionInfo(tableId, schemaId,
selectedFieldIds);
+ projectionsCache.getProjectionInfo(tableId, schemaId,
selectedFieldPositions);
if (cachedProjection == null) {
- cachedProjection = createProjectionInfo(schemaId,
selectedFieldIds);
+ cachedProjection = createProjectionInfo(schemaId,
selectedFieldPositions);
projectionsCache.setProjectionInfo(
- tableId, schemaId, selectedFieldIds, cachedProjection);
+ tableId, schemaId, selectedFieldPositions,
cachedProjection);
}
return cachedProjection;
}
- private ProjectionInfo createProjectionInfo(short schemaId, int[]
selectedFieldIds) {
+ private ProjectionInfo createProjectionInfo(short schemaId, int[]
selectedFieldPositions) {
org.apache.fluss.metadata.Schema schema =
schemaGetter.getSchema(schemaId);
- int[] selectedFieldPositions =
- selectedFieldPositions(schemaGetter.getSchema(schemaId),
selectedFieldIds);
RowType rowType = schema.getRowType();
// initialize the projection util information
@@ -460,37 +456,6 @@ public class FileLogProjection {
selectedFieldPositions);
}
- int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema,
int[] projectedFields) {
- Map<Integer, Integer> columnIdPositions = new HashMap<>();
- List<Integer> columnIds = schema.getColumnIds();
- for (int i = 0; i < columnIds.size(); i++) {
- columnIdPositions.put(columnIds.get(i), i);
- }
-
- int prev = -1;
- int[] selectedFieldPositions = new int[projectedFields.length];
- for (int i = 0; i < projectedFields.length; i++) {
- int fieldId = projectedFields[i];
- Integer position = columnIdPositions.get(fieldId);
- if (position == null) {
- throw new InvalidColumnProjectionException(
- String.format(
- "Projected field id %s is not contained in %s",
- fieldId, columnIds));
- }
-
- selectedFieldPositions[i] = position;
- if (position < prev) {
- throw new InvalidColumnProjectionException(
- "The projection indexes should be in field order, but
is "
- + Arrays.toString(projectedFields));
- }
-
- prev = position;
- }
- return selectedFieldPositions;
- }
-
/** Projection pushdown information for a specific schema and selected
fields. */
public static final class ProjectionInfo {
final BitSet nodesProjection;
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 6d3ee99af..2b0f695ec 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -75,17 +75,14 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
int schemaId = tableInfo.getSchemaId();
if (projection == null) {
// set a default dummy projection to simplify code
- projection =
- Projection.of(
- IntStream.range(0,
rowType.getFieldCount()).toArray(),
- tableInfo.getSchema());
+ projection = Projection.of(IntStream.range(0,
rowType.getFieldCount()).toArray());
}
if (logFormat == LogFormat.ARROW) {
if (readFromRemote) {
// currently, for remote read, arrow log doesn't support
projection pushdown,
// so set the rowType as is.
- int[] selectedFields = projection.getProjectionPositions();
+ int[] selectedFields = projection.getProjection();
return createArrowReadContext(
rowType, schemaId, selectedFields, false,
schemaGetter);
} else {
@@ -101,10 +98,10 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
schemaGetter);
}
} else if (logFormat == LogFormat.INDEXED) {
- int[] selectedFields = projection.getProjectionPositions();
+ int[] selectedFields = projection.getProjection();
return createIndexedReadContext(rowType, schemaId, selectedFields,
schemaGetter);
} else if (logFormat == LogFormat.COMPACTED) {
- int[] selectedFields = projection.getProjectionPositions();
+ int[] selectedFields = projection.getProjection();
return createCompactedRowReadContext(rowType, schemaId,
selectedFields);
} else {
throw new IllegalArgumentException("Unsupported log format: " +
logFormat);
@@ -145,6 +142,17 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
return createArrowReadContext(rowType, schemaId, selectedFields,
false, schemaGetter);
}
+ @VisibleForTesting
+ public static LogRecordReadContext createArrowReadContext(
+ RowType rowType,
+ int schemaId,
+ SchemaGetter schemaGetter,
+ boolean projectionPushDowned) {
+ int[] selectedFields = IntStream.range(0,
rowType.getFieldCount()).toArray();
+ return createArrowReadContext(
+ rowType, schemaId, selectedFields, projectionPushDowned,
schemaGetter);
+ }
+
/**
* Creates a LogRecordReadContext for INDEXED log format.
*
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
index a2d3da1ae..71f12192b 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
@@ -49,8 +49,9 @@ public class ProjectionPushdownCache {
}
@Nullable
- public ProjectionInfo getProjectionInfo(long tableId, short schemaId,
int[] selectedColumnIds) {
- ProjectionKey key = new ProjectionKey(tableId, schemaId,
selectedColumnIds);
+ public ProjectionInfo getProjectionInfo(
+ long tableId, short schemaId, int[] selectedFieldPositions) {
+ ProjectionKey key = new ProjectionKey(tableId, schemaId,
selectedFieldPositions);
return projectionCache.getIfPresent(key);
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
index c4f935680..296f6215b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
@@ -47,14 +47,25 @@ public class DataField implements Serializable {
private final @Nullable String description;
- public DataField(String name, DataType type, @Nullable String description)
{
+ private final int fieldId;
+
+ public DataField(String name, DataType type, @Nullable String description,
int fieldId) {
this.name = checkNotNull(name, "Field name must not be null.");
this.type = checkNotNull(type, "Field type must not be null.");
this.description = description;
+ this.fieldId = fieldId;
+ }
+
+ public DataField(String name, DataType type, Integer fieldId) {
+ this(name, type, null, fieldId);
}
public DataField(String name, DataType type) {
- this(name, type, null);
+ this(name, type, -1);
+ }
+
+ public DataField(String name, DataType type, @Nullable String description)
{
+ this(name, type, description, -1);
}
public String getName() {
@@ -65,12 +76,16 @@ public class DataField implements Serializable {
return type;
}
+ public int getFieldId() {
+ return fieldId;
+ }
+
public Optional<String> getDescription() {
return Optional.ofNullable(description);
}
public DataField copy() {
- return new DataField(name, type.copy(), description);
+ return new DataField(name, type.copy(), description, fieldId);
}
public String asSummaryString() {
@@ -90,6 +105,9 @@ public class DataField implements Serializable {
return false;
}
DataField rowField = (DataField) o;
+ // ignore field id in equality check, because field id is not part of
type definition,
+ // use RowType#
+ // we may ignore description too in the future.
return name.equals(rowField.name)
&& type.equals(rowField.type)
&& Objects.equals(description, rowField.description);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
index 76e399568..15ec476a3 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
@@ -57,6 +57,11 @@ public final class DataTypeChecks {
return dataType.accept(FIELD_TYPES_EXTRACTOR);
}
+ /** Checks whether two data types are equal including field ids for row
types. */
+ public static boolean equalsWithFieldId(DataType original, DataType that) {
+ return that.accept(new DataTypeEqualsWithFieldId(original));
+ }
+
private DataTypeChecks() {
// no instantiation
}
@@ -155,4 +160,38 @@ public final class DataTypeChecks {
return rowType.getFieldTypes();
}
}
+
+ private static class DataTypeEqualsWithFieldId extends
DataTypeDefaultVisitor<Boolean> {
+ private final DataType original;
+
+ private DataTypeEqualsWithFieldId(DataType original) {
+ this.original = original;
+ }
+
+ @Override
+ public Boolean visit(RowType that) {
+ if (!original.equals(that)) {
+ return false;
+ }
+
+ // compare field ids.
+ List<DataField> originalFields = ((RowType) original).getFields();
+ List<DataField> thatFields = that.getFields();
+ for (int i = 0; i < that.getFieldCount(); i++) {
+ DataField originalField = originalFields.get(i);
+ DataField thatField = thatFields.get(i);
+ if (originalField.getFieldId() != thatField.getFieldId()
+ || !equalsWithFieldId(originalField.getType(),
thatField.getType())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ protected Boolean defaultMethod(DataType that) {
+ return original.equals(that);
+ }
+ }
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
index 8993b146f..d0bc0d627 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
@@ -301,16 +301,39 @@ public class DataTypes {
return new MapType(keyType, valueType);
}
- /** Field definition with field name and data type. */
public static DataField FIELD(String name, DataType type) {
return new DataField(name, type);
}
- /** Field definition with field name, data type, and a description. */
+ /**
+ * Creates a field definition with field name, data type, and field ID.
+ *
+ * @param name the field name
+ * @param type the field data type
+ * @param fieldId the field ID for schema evolution
+ * @return a new data field without description
+ */
+ public static DataField FIELD(String name, DataType type, int fieldId) {
+ return new DataField(name, type, fieldId);
+ }
+
public static DataField FIELD(String name, DataType type, String
description) {
return new DataField(name, type, description);
}
+ /**
+ * Creates a field definition with field name, data type, description, and
field ID.
+ *
+ * @param name the field name
+ * @param type the field data type
+ * @param description the field description
+ * @param fieldId the field ID for schema evolution
+ * @return a new data field with all properties
+ */
+ public static DataField FIELD(String name, DataType type, String
description, int fieldId) {
+ return new DataField(name, type, description, fieldId);
+ }
+
/**
* Data type of a sequence of fields. A field consists of a field name,
field type, and an
* optional description. The most specific type of a row of a table is a
row type. In this case,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java
b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java
new file mode 100644
index 000000000..3c095f6df
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.types;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Visitor that recursively reassigns field IDs in nested data types using a
provided counter. */
+public class ReassignFieldId extends DataTypeDefaultVisitor<DataType> {
+
+ private final AtomicInteger highestFieldId;
+
+ public ReassignFieldId(AtomicInteger highestFieldId) {
+ this.highestFieldId = highestFieldId;
+ }
+
+ public static DataType reassign(DataType input, AtomicInteger
highestFieldId) {
+ return input.accept(new ReassignFieldId(highestFieldId));
+ }
+
+ @Override
+ public DataType visit(ArrayType arrayType) {
+ return new ArrayType(arrayType.isNullable(),
arrayType.getElementType().accept(this));
+ }
+
+ @Override
+ public DataType visit(MapType mapType) {
+ return new MapType(
+ mapType.isNullable(),
+ mapType.getKeyType().accept(this),
+ mapType.getValueType().accept(this));
+ }
+
+ @Override
+ public DataType visit(RowType rowType) {
+ List<DataField> originalDataFields = rowType.getFields();
+
+ List<DataField> dataFields = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField dataField = originalDataFields.get(i);
+ int id = highestFieldId.incrementAndGet();
+ DataType dataType = dataField.getType().accept(this);
+ dataFields.add(
+ new DataField(
+ dataField.getName(),
+ dataType,
+ dataField.getDescription().orElse(null),
+ id));
+ }
+
+ return new RowType(rowType.isNullable(), dataFields);
+ }
+
+ @Override
+ protected DataType defaultMethod(DataType dataType) {
+ return dataType;
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
index 090fe9655..fe2cfae05 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
@@ -167,6 +167,11 @@ public final class RowType extends DataType {
return fields.equals(rowType.fields);
}
+ /** Checks whether two data types are equal including field ids for row
types. */
+ public boolean equalsWithFieldId(DataType other) {
+ return DataTypeChecks.equalsWithFieldId(this, other);
+ }
+
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), fields);
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
index 7c392b055..c16c6b10b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
@@ -17,11 +17,9 @@
package org.apache.fluss.utils;
-import org.apache.fluss.metadata.Schema;
import org.apache.fluss.types.RowType;
import java.util.Arrays;
-import java.util.List;
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -30,14 +28,12 @@ import static
org.apache.fluss.utils.Preconditions.checkState;
* projection includes both reducing the accessible fields and reordering
them. Currently, this only
* supports top-level projection. Nested projection will be supported in the
future.
*
- * <p>For example, given a row with fields [a, b, c, d, e] with id [0, 1, 3,
4, 5], a projection [2,
- * 0, 3] will project the row to [c, a, d] with id [3, 0, 4]. The projection
indexes are 0-based.
+ * <p>For example, given a row with fields [a, b, c, d, e], a projection [2,
0, 3] will project the
+ * row to [c, a, d]. The projection indexes are 0-based.
*
* <ul>
- * <li>The {@link #projectionPositions} indexes will be [2, 0, 3]
- * <li>The {@link #projectionPositions} id will be [3, 0, 4]
- * <li>The {@link #projectionIdsInOrder} indexes will be [0, 3, 4]
- * <li>The {@link #projectionPositionsOrderById} indexes will be [0, 2, 3]
+ * <li>The {@link #projection} indexes will be [2, 0, 3]
+ * <li>The {@link #projectionInOrder} indexes will be [0, 2, 3]
* <li>The {@link #reorderingIndexes} indexes will be [1, 0, 2]
* </ul>
*
@@ -45,76 +41,42 @@ import static
org.apache.fluss.utils.Preconditions.checkState;
*/
public class Projection {
/** the projection indexes including both selected fields and reordering
them. */
- final int[] projectionPositions;
+ final int[] projection;
/** the projection indexes that only select fields but not reordering
them. */
- final int[] projectionPositionsOrderById;
- /** the projection indexes that only select fields but not reordering
them. */
- final int[] projectionIdsInOrder;
- /**
- * the indexes to reorder the fields of {@link
#projectionPositionsOrderById} to {@link
- * #projectionPositions}.
- */
+ final int[] projectionInOrder;
+ /** the indexes to reorder the fields of {@link #projectionInOrder} to
{@link #projection}. */
final int[] reorderingIndexes;
/** the flag to indicate whether reordering is needed. */
final boolean reorderingNeeded;
- public static Projection of(int[] indexes, Schema schema) {
- int[] ids = new int[indexes.length];
- List<Integer> columnIds = schema.getColumnIds();
- for (int i = 0; i < indexes.length; i++) {
- ids[i] = columnIds.get(indexes[i]);
- }
-
- return new Projection(indexes, ids);
+ /** Create a {@link Projection} of the provided {@code indexes}. */
+ public static Projection of(int[] indexes) {
+ return new Projection(indexes);
}
- /**
- * Create a {@link Projection} of the provided {@code indexes} and {@code
projectionIds}.
- *
- * <p>Typically, {@code projectionIndexes} and {@code projectionIds} are
the same. But when
- * removing a middle column or reordering columns, they won't be the same.
In this case, when
- * the schema on the fluss server side differs from the schema on the
client side during user
- * queries, using {@code projectionIds} ensures correctness by mapping to
the actual server-side
- * column positions.
- *
- * @param projectionIndexes the indexes of the projection, which is used
for user to read data
- * from client side. These indexes are based on the schema visible to
the client side and
- * are used to parse the data returned by the fluss server.
- * @param projectionIds the ids of the projection, which is used for fluss
server scan. These
- * ids are based on the actual schema on the fluss server side and are
pushed down to the
- * server for querying.
- */
- private Projection(int[] projectionIndexes, int[] projectionIds) {
- checkState(
- projectionIds.length == projectionIndexes.length,
- "The number of ids must be equal to the number of indexes.");
- this.projectionPositions = projectionIndexes;
-
- // reorder the projection id for lookup.
- this.projectionIdsInOrder = Arrays.copyOf(projectionIds,
projectionIds.length);
- Arrays.sort(projectionIdsInOrder);
- this.reorderingNeeded = !Arrays.equals(projectionIds,
projectionIdsInOrder);
- this.reorderingIndexes = new int[projectionPositions.length];
- this.projectionPositionsOrderById = new
int[projectionPositions.length];
- for (int i = 0; i < projectionIds.length; i++) {
- int index = Arrays.binarySearch(projectionIdsInOrder,
projectionIds[i]);
+ private Projection(int[] projection) {
+ this.projection = projection;
+ this.projectionInOrder = Arrays.copyOf(projection, projection.length);
+ Arrays.sort(projectionInOrder);
+ this.reorderingNeeded = !Arrays.equals(projection, projectionInOrder);
+ this.reorderingIndexes = new int[projection.length];
+ for (int i = 0; i < projection.length; i++) {
+ int index = Arrays.binarySearch(projectionInOrder, projection[i]);
checkState(index >= 0, "The projection index is invalid.");
reorderingIndexes[i] = index;
- projectionPositionsOrderById[i] = projectionIndexes[index];
}
}
public RowType projectInOrder(RowType rowType) {
- return rowType.project(projectionPositionsOrderById);
+ return rowType.project(projectionInOrder);
}
- public int[] getProjectionPositions() {
- return projectionPositions;
+ public int[] getProjection() {
+ return projection;
}
- /** The id of the projection, which is used for fluss server scan. */
- public int[] getProjectionIdInOrder() {
- return projectionIdsInOrder;
+ public int[] getProjectionInOrder() {
+ return projectionInOrder;
}
public boolean isReorderingNeeded() {
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
index 9bcd3262b..12e6fb1c5 100644
---
a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
+++
b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
@@ -70,6 +70,7 @@ public class DataTypeJsonSerde implements
JsonSerializer<DataType>, JsonDeserial
static final String FIELD_NAME_FIELDS = "fields";
static final String FIELD_NAME_FIELD_NAME = "name";
static final String FIELD_NAME_FIELD_TYPE = "field_type";
+ static final String FIELD_NAME_FIELD_ID = "field_id";
static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
@Override
@@ -190,6 +191,8 @@ public class DataTypeJsonSerde implements
JsonSerializer<DataType>, JsonDeserial
jsonGenerator.writeStringField(
FIELD_NAME_FIELD_DESCRIPTION,
dataField.getDescription().get());
}
+
+ jsonGenerator.writeNumberField(FIELD_NAME_FIELD_ID,
dataField.getFieldId());
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndArray();
@@ -291,18 +294,23 @@ public class DataTypeJsonSerde implements
JsonSerializer<DataType>, JsonDeserial
private static DataType deserializeRow(JsonNode dataTypeNode) {
final ArrayNode fieldNodes = (ArrayNode)
dataTypeNode.get(FIELD_NAME_FIELDS);
final List<DataField> fields = new ArrayList<>();
+
for (JsonNode fieldNode : fieldNodes) {
final String fieldName =
fieldNode.get(FIELD_NAME_FIELD_NAME).asText();
final DataType fieldType =
DataTypeJsonSerde.INSTANCE.deserialize(fieldNode.get(FIELD_NAME_FIELD_TYPE));
- final String fieldDescription;
- if (fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)) {
- fieldDescription =
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText();
- } else {
- fieldDescription = null;
- }
- fields.add(new DataField(fieldName, fieldType, fieldDescription));
+ final String fieldDescription =
+ fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)
+ ?
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText()
+ : null;
+
+ final int fieldId =
+ fieldNode.has(FIELD_NAME_FIELD_ID)
+ ? fieldNode.get(FIELD_NAME_FIELD_ID).asInt()
+ : -1;
+ fields.add(new DataField(fieldName, fieldType, fieldDescription,
fieldId));
}
+
return new RowType(fields);
}
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
index 32ba62b06..82e2fd76e 100644
--- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -18,10 +18,13 @@
package org.apache.fluss.metadata;
import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -122,6 +125,140 @@ class TableSchemaTest {
.hasMessage("Auto increment column can only be used in
primary-key table.");
}
+ @Test
+ void testReassignFieldId() {
+ // Schema.Builder.column will reassign field id in flatten order.
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING().copy(false))
+ .column(
+ "f1",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 0),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 1),
+ DataTypes.FIELD(
+ "n2",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "m1",
DataTypes.TINYINT(), 0)),
+ 2)))
+ .column(
+ "f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 0),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 1)))
+ .column("f3", DataTypes.STRING())
+ .primaryKey("f0")
+ .build();
+ assertThat(schema.getColumnIds()).containsExactly(0, 1, 6, 9);
+ RowType expectedType =
+ new RowType(
+ true,
+ Arrays.asList(
+ DataTypes.FIELD("f0",
DataTypes.STRING().copy(false), 0),
+ DataTypes.FIELD(
+ "f1",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 2),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 3),
+ DataTypes.FIELD(
+ "n2",
+ DataTypes.ROW(
+
DataTypes.FIELD(
+ "m1",
+
DataTypes.TINYINT(),
+ 5)),
+ 4)),
+ 1),
+ DataTypes.FIELD(
+ "f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 7),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 8)),
+ 6),
+ DataTypes.FIELD("f3", DataTypes.STRING(), 9)));
+
assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue();
+
+ // Schema.Builder.fromColumns won't reassign field id.
+ List<Schema.Column> columns =
+ Arrays.asList(
+ new Schema.Column("f0",
DataTypes.STRING().copy(false), null, 0),
+ new Schema.Column(
+ "f1",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 0),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 1),
+ DataTypes.FIELD(
+ "n2",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "m1",
DataTypes.TINYINT(), 1)),
+ 2)),
+ null,
+ 1),
+ new Schema.Column(
+ "f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 0),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 1)),
+ null,
+ 2));
+ assertThatThrownBy(() ->
Schema.newBuilder().fromColumns(columns).build())
+ .hasMessageContaining(
+ "All field IDs (including nested fields) must be
unique. Found 3 unique IDs but expected 9");
+ List<Schema.Column> columns2 =
+ Arrays.asList(
+ new Schema.Column("f0",
DataTypes.STRING().copy(false), null, 0),
+ new Schema.Column(
+ "f1",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 6),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 7),
+ DataTypes.FIELD(
+ "n2",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "m1",
DataTypes.TINYINT(), 11)),
+ 8)),
+ null,
+ 1),
+ new Schema.Column(
+ "f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 9),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 10)),
+ null,
+ 2));
+ schema = Schema.newBuilder().fromColumns(columns2).build();
+ assertThat(schema.getColumnIds()).containsExactly(0, 1, 2);
+ expectedType =
+ new RowType(
+ true,
+ Arrays.asList(
+ DataTypes.FIELD("f0",
DataTypes.STRING().copy(false), 0),
+ DataTypes.FIELD(
+ "f1",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 6),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 7),
+ DataTypes.FIELD(
+ "n2",
+ DataTypes.ROW(
+
DataTypes.FIELD(
+ "m1",
+
DataTypes.TINYINT(),
+ 11)),
+ 8)),
+ 1),
+ DataTypes.FIELD(
+ "f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("n0",
DataTypes.TINYINT(), 9),
+ DataTypes.FIELD("n1",
DataTypes.STRING(), 10)),
+ 2)));
+
assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue();
+ }
+
@Test
void testSchemaBuilderColumnWithAggFunction() {
Schema schema =
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
index 9a27089ac..b85e4db32 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
@@ -134,7 +134,7 @@ class FileLogProjectionTest {
new int[] {3},
recordsOfData2RowType.sizeInBytes()))
.isInstanceOf(InvalidColumnProjectionException.class)
- .hasMessage("Projected field id 3 is not contained in [0, 1,
2]");
+ .hasMessage("Projected fields [3] is out of bound for schema
with 3 fields.");
assertThatThrownBy(
() ->
@@ -162,6 +162,51 @@ class FileLogProjectionTest {
"The projection indexes should not contain duplicated
fields, but is [0, 0, 0]");
}
+ @Test
+ void testProjectionOldDataWithNewSchema() throws Exception {
+ // Currently, we only support add column at last.
+ short schemaId = 1;
+ try (FileLogRecords records =
+ createFileLogRecords(
+ schemaId, LOG_MAGIC_VALUE_V1, TestData.DATA1_ROW_TYPE,
TestData.DATA1)) {
+
+ ProjectionPushdownCache cache = new ProjectionPushdownCache();
+ FileLogProjection projection = new FileLogProjection(cache);
+ assertThat(
+ doProjection(
+ 2L,
+ 2,
+ projection,
+ records,
+ new int[] {1},
+ records.sizeInBytes()))
+ .containsExactly(
+ new Object[] {"a"},
+ new Object[] {"b"},
+ new Object[] {"c"},
+ new Object[] {"d"},
+ new Object[] {"e"},
+ new Object[] {"f"},
+ new Object[] {"g"},
+ new Object[] {"h"},
+ new Object[] {"i"},
+ new Object[] {"j"});
+
+ assertThatThrownBy(
+ () ->
+ doProjection(
+ 1L,
+ 2,
+ projection,
+ records,
+ new int[] {0, 2},
+ records.sizeInBytes()))
+ .isInstanceOf(InvalidColumnProjectionException.class)
+ .hasMessage(
+ "Projected fields [0, 2] is out of bound for
schema with 2 fields.");
+ }
+ }
+
static Stream<Arguments> projectedFieldsArgs() {
return Stream.of(
Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0,
(short) 1),
@@ -420,7 +465,7 @@ class FileLogProjectionTest {
private List<Object[]> doProjection(
long tableId,
- short schemaId,
+ int schemaId,
FileLogProjection projection,
FileLogRecords fileLogRecords,
int[] projectedFields,
@@ -437,7 +482,7 @@ class FileLogProjectionTest {
List<Object[]> results = new ArrayList<>();
long expectedOffset = 0L;
try (LogRecordReadContext context =
- createArrowReadContext(projectedType, schemaId,
testingSchemaGetter)) {
+ createArrowReadContext(projectedType, schemaId,
testingSchemaGetter, true)) {
for (LogRecordBatch batch : project.batches()) {
try (CloseableIterator<LogRecord> records =
batch.records(context)) {
while (records.hasNext()) {
diff --git
a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
index e9f0dc8cc..309f0fabd 100644
--- a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
@@ -608,6 +608,16 @@ public class DataTypesTest {
new DataField("a1", new IntType(), "column
a1"),
new DataField("b", new CharType(5), "column
b"))));
+ // test ignore field_id
+ dataTypeBaseAssert(
+ rowType,
+ true,
+ "ROW<`a` INT 'column a', `b` CHAR(5) 'column b'>",
+ new RowType(
+ Arrays.asList(
+ new DataField("a1", new IntType(), "column
a1"),
+ new DataField("b", new CharType(5), "column
b"))));
+
rowType =
new RowType(
false,
@@ -654,7 +664,7 @@ public class DataTypesTest {
new RowType(
Arrays.asList(
new DataField("a1", new IntType(), "column
a1"),
- new DataField("b", new CharType(5)))));
+ new DataField("b", new CharType(5), 1))));
rowType = RowType.of(DataTypes.CHAR(1), DataTypes.CHAR(2));
dataTypeBaseAssert(
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
index 48fc2ac67..4f85e9516 100644
--- a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
@@ -17,7 +17,6 @@
package org.apache.fluss.utils;
-import org.apache.fluss.metadata.Schema;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.ProjectedRow;
@@ -33,15 +32,8 @@ class ProjectionTest {
@Test
void testProjection() {
- Schema schema =
- Schema.newBuilder()
- .column("f0", DataTypes.INT())
- .column("f1", DataTypes.BIGINT())
- .column("f2", DataTypes.STRING())
- .column("f3", DataTypes.DOUBLE())
- .build();
- Projection projection = Projection.of(new int[] {2, 0, 3}, schema);
- assertThat(projection.getProjectionIdInOrder()).isEqualTo(new int[]
{0, 2, 3});
+ Projection projection = Projection.of(new int[] {2, 0, 3});
+ assertThat(projection.getProjectionInOrder()).isEqualTo(new int[] {0,
2, 3});
RowType rowType =
projection.projectInOrder(
@@ -53,21 +45,21 @@ class ProjectionTest {
assertThat(rowType)
.isEqualTo(
DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f2", DataTypes.STRING()),
- DataTypes.FIELD("f3", DataTypes.DOUBLE())));
+ DataTypes.FIELD("f0", DataTypes.INT(), 0),
+ DataTypes.FIELD("f2", DataTypes.STRING(), 2),
+ DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3)));
assertThat(projection.isReorderingNeeded()).isTrue();
assertThat(projection.getReorderingIndexes()).isEqualTo(new int[] {1,
0, 2});
assertThat(rowType.project(projection.getReorderingIndexes()))
.isEqualTo(
DataTypes.ROW(
- DataTypes.FIELD("f2", DataTypes.STRING()),
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f3", DataTypes.DOUBLE())));
+ DataTypes.FIELD("f2", DataTypes.STRING(), 2),
+ DataTypes.FIELD("f0", DataTypes.INT(), 0),
+ DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3)));
GenericRow row = GenericRow.of(0, 1L, BinaryString.fromString("2"),
3.0d);
- ProjectedRow p1 =
ProjectedRow.from(projection.getProjectionIdInOrder());
+ ProjectedRow p1 = ProjectedRow.from(projection.getProjectionInOrder());
p1.replaceRow(row);
ProjectedRow p2 = ProjectedRow.from(projection.getReorderingIndexes());
p2.replaceRow(p1);
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
index 716e04347..eba8159fb 100644
---
a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
@@ -20,11 +20,13 @@ package org.apache.fluss.utils.json;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.RowType;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,11 +38,21 @@ public class ColumnJsonSerdeTest extends
JsonSerdeTestBase<Schema.Column> {
@Override
protected Schema.Column[] createObjects() {
- Schema.Column[] columns = new Schema.Column[4];
+ Schema.Column[] columns = new Schema.Column[5];
columns[0] = new Schema.Column("a", DataTypes.STRING());
columns[1] = new Schema.Column("b", DataTypes.INT(), "hello b");
columns[2] = new Schema.Column("c", new IntType(false), "hello c");
columns[3] = new Schema.Column("d", new IntType(false), "hello c",
(short) 2);
+ columns[4] =
+ new Schema.Column(
+ "e",
+ new RowType(
+ true,
+ Arrays.asList(
+ DataTypes.FIELD("f",
DataTypes.STRING()),
+ DataTypes.FIELD("g",
DataTypes.STRING(), 1))),
+ "hello c",
+ (short) 2);
return columns;
}
@@ -50,7 +62,8 @@ public class ColumnJsonSerdeTest extends
JsonSerdeTestBase<Schema.Column> {
"{\"name\":\"a\",\"data_type\":{\"type\":\"STRING\"},\"id\":-1}",
"{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"hello
b\",\"id\":-1}",
"{\"name\":\"c\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
c\",\"id\":-1}",
-
"{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
c\",\"id\":2}"
+
"{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
c\",\"id\":2}",
+
"{\"name\":\"e\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"f\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":-1},{\"name\":\"g\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":1}]},\"comment\":\"hello
c\",\"id\":2}"
};
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
index 82d6ca4e8..20e338f60 100644
---
a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
@@ -24,6 +24,8 @@ import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
import org.apache.fluss.types.DoubleType;
@@ -38,10 +40,15 @@ import org.apache.fluss.types.TimeType;
import org.apache.fluss.types.TimestampType;
import org.apache.fluss.types.TinyIntType;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Test for {@link DataTypeJsonSerde}. */
public class DataTypeJsonSerdeTest extends JsonSerdeTestBase<DataType> {
@@ -49,6 +56,12 @@ public class DataTypeJsonSerdeTest extends
JsonSerdeTestBase<DataType> {
super(DataTypeJsonSerde.INSTANCE);
}
+ @Override
+ protected void assertEquals(DataType actual, DataType expected) {
+ // compare with field_id.
+ assertThat(DataTypeChecks.equalsWithFieldId(actual,
expected)).isTrue();
+ }
+
@Override
protected DataType[] createObjects() {
final List<DataType> types =
@@ -77,7 +90,12 @@ public class DataTypeJsonSerdeTest extends
JsonSerdeTestBase<DataType> {
new LocalZonedTimestampType(3),
new ArrayType(new IntType(false)),
new MapType(new BigIntType(false), new IntType(false)),
- RowType.of(new BigIntType(), new IntType(false), new
StringType()));
+ new RowType(
+ true,
+ Arrays.asList(
+ DataTypes.FIELD("f0", new
BigIntType(), null),
+ DataTypes.FIELD("f1", new
IntType(false), null, 1),
+ DataTypes.FIELD("f2", new
StringType(), null, 2))));
final List<DataType> allTypes = new ArrayList<>();
// consider nullable
@@ -139,8 +157,27 @@ public class DataTypeJsonSerdeTest extends
JsonSerdeTestBase<DataType> {
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
};
}
+
+ @Test
+ void testJsonLackOfFieldId() {
+ // some fields with field_id while others without field_id.
+ String testJsonWithInconsistencyFieldId =
+
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}";
+ DataType dataType =
+ JsonSerdeUtils.readValue(
+
testJsonWithInconsistencyFieldId.getBytes(StandardCharsets.UTF_8),
+ DataTypeJsonSerde.INSTANCE);
+ assertThat(dataType).isInstanceOf(RowType.class);
+ assertEquals(
+ dataType,
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1",
DataTypes.INT().copy(false), 1),
+ DataTypes.FIELD("f2", DataTypes.STRING(), 2))
+ .copy(false));
+ }
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index 018214b3f..54c5f69ea 100644
---
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -79,6 +79,18 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
.enableAutoIncrement("b")
.build();
+ static final Schema SCHEMA_5 =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column(
+ "b",
+ DataTypes.ROW(
+ DataTypes.FIELD("c", DataTypes.INT(), "a
is first column", 0),
+ DataTypes.FIELD("d", DataTypes.INT(), "a
is first column", 1)))
+ .withComment("b is second column")
+ .build();
+
static final String SCHEMA_JSON_0 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}";
static final String SCHEMA_JSON_1 =
@@ -90,6 +102,9 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
static final String SCHEMA_JSON_4 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+ static final String SCHEMA_JSON_5 =
+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"c\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
is first
column\",\"field_id\":2},{\"name\":\"d\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
is first column\",\"field_id\":3}]},\"comment\":\"b is second
column\",\"id\":1}],\"highest_field_id\":3}";
+
static final Schema SCHEMA_WITH_AGG =
Schema.newBuilder()
.column("product_id", DataTypes.BIGINT().copy(false))
@@ -112,9 +127,18 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
super(SchemaJsonSerde.INSTANCE);
}
+ @Override
+ protected void assertEquals(Schema actual, Schema expected) {
+ assertThat(actual).isEqualTo(expected);
+ // compare field ids.
+
assertThat(actual.getRowType().equalsWithFieldId(expected.getRowType())).isTrue();
+ }
+
@Override
protected Schema[] createObjects() {
- return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4,
SCHEMA_WITH_AGG};
+ return new Schema[] {
+ SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4, SCHEMA_5,
SCHEMA_WITH_AGG
+ };
}
@Override
@@ -125,6 +149,7 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
SCHEMA_JSON_1,
SCHEMA_JSON_3,
SCHEMA_JSON_4,
+ SCHEMA_JSON_5,
SCHEMA_JSON_WITH_AGG
};
}
@@ -132,13 +157,12 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
@Test
void testCompatibilityFromJsonLackOfColumnId() {
String[] jsons = jsonLackOfColumnId();
- Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2,
SCHEMA_3};
+ Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2,
SCHEMA_3, SCHEMA_5};
for (int i = 0; i < jsons.length; i++) {
- assertThat(
- JsonSerdeUtils.readValue(
- jsons[i].getBytes(StandardCharsets.UTF_8),
- SchemaJsonSerde.INSTANCE))
- .isEqualTo(expectedSchema[i]);
+ assertEquals(
+ JsonSerdeUtils.readValue(
+ jsons[i].getBytes(StandardCharsets.UTF_8),
SchemaJsonSerde.INSTANCE),
+ expectedSchema[i]);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 3088c9d30..9aef6c931 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -198,16 +198,16 @@ public class FlinkConversions {
}
// first build schema with physical columns
- schemBuilder.fromColumns(
- resolvedSchema.getColumns().stream()
- .filter(Column::isPhysical)
- .map(
- column ->
- new Schema.Column(
- column.getName(),
-
FlinkConversions.toFlussType(column.getDataType()),
-
column.getComment().orElse(null)))
- .collect(Collectors.toList()));
+ resolvedSchema.getColumns().stream()
+ .filter(Column::isPhysical)
+ .forEachOrdered(
+ column -> {
+ schemBuilder
+ .column(
+ column.getName(),
+
FlinkConversions.toFlussType(column.getDataType()))
+
.withComment(column.getComment().orElse(null));
+ });
// convert some flink options to fluss table configs.
Map<String, String> storageProperties =
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
index 0c4432986..70d581fda 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
@@ -17,7 +17,11 @@
package org.apache.fluss.flink.sink;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -46,6 +50,7 @@ import java.util.List;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Integration tests for Array type support in Flink connector. */
@@ -592,4 +597,74 @@ abstract class FlinkComplexTypeITCase extends
AbstractTestBase {
"Bucket key column 'info' has unsupported data type
ROW<`name` STRING, `age` INT>. "
+ "Currently, bucket key column does not
support types: [ARRAY, MAP, ROW].");
}
+
+ @Test
+ void testProjectionAndAddColumnInLogTable() throws Exception {
+ tEnv.executeSql(
+ "create table row_log_test ("
+ + "id int, "
+ + "simple_row row<a int, b string>, "
+ + "nested_row row<x int, y row<z int, w string>, v
string>, "
+ + "array_of_rows array<row<a int, b string>>, "
+ + "data string "
+ + ") with ('bucket.num' = '3')");
+
+ tEnv.executeSql(
+ "INSERT INTO row_log_test VALUES "
+ + "(1, ROW(10, 'hello'), ROW(20, ROW(30,
'nested'), 'row1'), "
+ + "ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa'), "
+ + "(2, ROW(40, 'world'), ROW(50, ROW(60,
'test'), 'row2'), "
+ + "ARRAY[ROW(3, 'c')], 'bb')")
+ .await();
+
+ CloseableIterator<Row> rowIter = tEnv.executeSql("select * from
row_log_test").collect();
+ List<String> expectedRows =
+ Arrays.asList(
+ "+I[1, +I[10, hello], +I[20, +I[30, nested], row1],
[+I[1, a], +I[2, b]], aa]",
+ "+I[2, +I[40, world], +I[50, +I[60, test], row2],
[+I[3, c]], bb]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+ // Currently, flink not supported push down nested row projection
because
+ // FlinkTableSource.supportsNestedProjection returns false.
+ // Todo: support nested row projection pushdown in
+ // https://github.com/apache/fluss/issues/2311 later.
+ String s =
+ tEnv.explainSql("select id, simple_row.a, nested_row.y.z, data
from row_log_test");
+ assertThat(s)
+ .contains(
+ "TableSourceScan(table=[[testcatalog, defaultdb,
row_log_test, project=[id, simple_row, nested_row, data]]]");
+ rowIter =
+ tEnv.executeSql("select id, simple_row.a, nested_row.y.z, data
from row_log_test")
+ .collect();
+ expectedRows = Arrays.asList("+I[1, 10, 30, aa]", "+I[2, 40, 60, bb]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+ // Test add columns
+ tEnv.executeSql(
+ "alter table row_log_test add ("
+ + "simple_row2 row<a int, b string>, "
+ + "nested_row2 row<x int, y row<z int, w string>, v
string>, "
+ + "array_of_rows2 array<row<a int, b string>>)");
+
+ tEnv.executeSql(
+ "INSERT INTO row_log_test VALUES "
+ + "(1, ROW(10, 'hello'), ROW(20, ROW(30,
'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa', ROW(10, 'hello'),
ROW(20, ROW(30, 'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')]),"
+ + "(2, ROW(40, 'world'), ROW(50, ROW(60,
'test'), 'row2'), ARRAY[ROW(3, 'c')], 'bb', ROW(40, 'world'), ROW(50, ROW(60,
'test'), 'row2'), ARRAY[ROW(3, 'c')])")
+ .await();
+ rowIter = tEnv.executeSql("select * from row_log_test").collect();
+ expectedRows =
+ Arrays.asList(
+ "+I[1, +I[10, hello], +I[20, +I[30, nested], row1],
[+I[1, a], +I[2, b]], aa, null, null, null]",
+ "+I[2, +I[40, world], +I[50, +I[60, test], row2],
[+I[3, c]], bb, null, null, null]",
+ "+I[1, +I[10, hello], +I[20, +I[30, nested], row1],
[+I[1, a], +I[2, b]], aa, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1,
a], +I[2, b]]]",
+ "+I[2, +I[40, world], +I[50, +I[60, test], row2],
[+I[3, c]], bb, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]]]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+ try (Connection conn =
+ ConnectionFactory.createConnection(
+ FLUSS_CLUSTER_EXTENSION.getClientConfig());
+ Table table = conn.getTable(TablePath.of(DEFAULT_DB,
"row_log_test"))) {
+ // check field id
+ org.apache.fluss.metadata.Schema schema =
table.getTableInfo().getSchema();
+ assertThat(schema.getColumnIds()).containsExactly(0, 1, 4, 10, 13,
14, 17, 23);
+ }
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index fffb0c878..ad142a6c5 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -24,7 +24,9 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.Schema;
@@ -55,6 +57,7 @@ import static
org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
import static
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
+import static org.apache.fluss.types.DataTypes.FIELD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -85,8 +88,8 @@ public class FlinkConversionsTest {
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
DataTypes.ROW(
- DataTypes.FIELD("a",
DataTypes.STRING().copy(false)),
- DataTypes.FIELD("b", DataTypes.INT())));
+ FIELD("a", DataTypes.STRING().copy(false)),
+ FIELD("b", DataTypes.INT())));
// flink types
List<org.apache.flink.table.types.DataType> flinkTypes =
@@ -133,6 +136,13 @@ public class FlinkConversionsTest {
}
assertThat(actualFlussTypes).isEqualTo(flussTypes);
+ // check the field id of rowType.
+ assertThat(
+ DataTypeChecks.equalsWithFieldId(
+ actualFlussTypes.get(actualFlussTypes.size() -
1),
+ flussTypes.get(flinkTypes.size() - 1)))
+ .isTrue();
+
// test conversion for data types not supported in Fluss
assertThatThrownBy(() -> FlinkConversions.toFlussType(VARCHAR(10)))
.isInstanceOf(UnsupportedOperationException.class)
@@ -158,6 +168,34 @@ public class FlinkConversionsTest {
Column.physical(
"order_id",
org.apache.flink.table.api.DataTypes.STRING().notNull()),
+ Column.physical(
+ "item",
+
org.apache.flink.table.api.DataTypes.ROW(
+
org.apache.flink.table.api.DataTypes.FIELD(
+ "item_id",
+
org.apache.flink.table.api.DataTypes
+ .STRING()),
+
org.apache.flink.table.api.DataTypes.FIELD(
+ "item_price",
+
org.apache.flink.table.api.DataTypes
+ .STRING()),
+
org.apache.flink.table.api.DataTypes.FIELD(
+ "item_details",
+
org.apache.flink.table.api.DataTypes.ROW(
+
org.apache.flink.table.api.DataTypes
+ .FIELD(
+
"category",
+
org.apache.flink
+
.table.api
+
.DataTypes
+
.STRING()),
+
org.apache.flink.table.api.DataTypes
+ .FIELD(
+
"specifications",
+
org.apache.flink
+
.table.api
+
.DataTypes
+
.STRING()))))),
Column.physical(
"orig_ts",
org.apache.flink.table.api.DataTypes.TIMESTAMP()),
@@ -181,18 +219,37 @@ public class FlinkConversionsTest {
String expectFlussTableString =
"TableDescriptor{schema=("
+ "order_id STRING NOT NULL,"
+ + "item ROW<`item_id` STRING, `item_price` STRING,
`item_details` ROW<`category` STRING, `specifications` STRING>>,"
+ "orig_ts TIMESTAMP(6),"
+ "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
+ "), comment='test comment', partitionKeys=[], "
+ "tableDistribution={bucketKeys=[order_id]
bucketCount=null}, "
+ "properties={}, "
- +
"customProperties={schema.watermark.0.strategy.expr=orig_ts, "
- + "schema.2.expr=orig_ts,
schema.2.data-type=TIMESTAMP(3), "
+ + "customProperties={"
+ + "schema.3.data-type=TIMESTAMP(3), "
+ + "schema.watermark.0.strategy.expr=orig_ts, "
+ + "schema.3.name=compute_ts, "
+ "schema.watermark.0.rowtime=orig_ts, "
+ "schema.watermark.0.strategy.data-type=TIMESTAMP(3),
"
+ "k1=v1, k2=v2, "
- + "schema.2.name=compute_ts}}";
+ + "schema.3.expr=orig_ts}}";
assertThat(flussTable.toString()).isEqualTo(expectFlussTableString);
+ assertThat(flussTable.getSchema().getColumnIds()).containsExactly(0,
1, 7);
+ // check the nested row column "item"
+ org.apache.fluss.metadata.Schema.Column column =
flussTable.getSchema().getColumns().get(1);
+ assertThat(column.getName()).isEqualTo("item");
+ assertThat(column.getDataType()).isInstanceOf(RowType.class);
+ RowType rowType = (RowType) column.getDataType();
+ assertThat(rowType.getFields())
+ .containsExactly(
+ FIELD("item_id", DataTypes.STRING(), 2),
+ FIELD("item_price", DataTypes.STRING(), 3),
+ FIELD(
+ "item_details",
+ DataTypes.ROW(
+ FIELD("category", DataTypes.STRING(),
5),
+ FIELD("specifications",
DataTypes.STRING(), 6)),
+ 4));
// test convert fluss table to flink table
TablePath tablePath = TablePath.of("db", "table");
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 3162e4bed..eff79605a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -21,6 +21,8 @@ import org.apache.fluss.exception.SchemaChangeException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.ReassignFieldId;
import java.util.ArrayList;
import java.util.HashMap;
@@ -106,12 +108,11 @@ public class SchemaUpdate {
"Column " + addColumn.getName() + " must be nullable.");
}
+ int columnId = highestFieldId.incrementAndGet();
+ DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(),
highestFieldId);
+
Schema.Column newColumn =
- new Schema.Column(
- addColumn.getName(),
- addColumn.getDataType(),
- addColumn.getComment(),
- (byte) highestFieldId.incrementAndGet());
+ new Schema.Column(addColumn.getName(), dataType,
addColumn.getComment(), columnId);
columns.add(newColumn);
existedColumns.put(newColumn.getName(), newColumn);
return this;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 8f12e43c4..f21d5bdd8 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -63,7 +63,9 @@ import
org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.json.DataTypeJsonSerde;
import org.apache.fluss.utils.json.JsonSerdeUtils;
@@ -699,7 +701,19 @@ class TableManagerITCase {
pbTableMetadata.getCreatedTime(),
pbTableMetadata.getModifiedTime());
List<Schema.Column> columns = tableInfo.getSchema().getColumns();
- assertThat(columns.size()).isEqualTo(3);
+ assertThat(columns.size()).isEqualTo(4);
+ assertThat(tableInfo.getSchema().getColumnIds()).containsExactly(0, 1,
2, 5);
+ // check nested row's field_id.
+ assertThat(columns.get(2).getName()).isEqualTo("new_nested_column");
+ assertThat(
+ DataTypeChecks.equalsWithFieldId(
+ columns.get(2).getDataType(),
+ new RowType(
+ true,
+ Arrays.asList(
+ DataTypes.FIELD("f0",
DataTypes.STRING(), 3),
+ DataTypes.FIELD("f1",
DataTypes.INT(), 4)))))
+ .isTrue();
}
private void checkBucketMetadata(int expectBucketCount,
List<PbBucketMetadata> bucketMetadata) {
@@ -830,6 +844,17 @@ class TableManagerITCase {
private static List<PbAddColumn> alterTableAddColumns() {
List<PbAddColumn> addColumns = new ArrayList<>();
+ PbAddColumn newNestedColumn = new PbAddColumn();
+ newNestedColumn
+ .setColumnName("new_nested_column")
+ .setDataTypeJson(
+ JsonSerdeUtils.writeValueAsBytes(
+ DataTypes.ROW(DataTypes.STRING(),
DataTypes.INT()),
+ DataTypeJsonSerde.INSTANCE))
+ .setComment("new_nested_column")
+ .setColumnPositionType(0);
+ addColumns.add(newNestedColumn);
+
PbAddColumn newColumn = new PbAddColumn();
newColumn
.setColumnName("new_column")
@@ -839,6 +864,7 @@ class TableManagerITCase {
.setComment("new_column")
.setColumnPositionType(0);
addColumns.add(newColumn);
+
return addColumns;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 01cf2701a..5a28f9e2b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -330,7 +330,7 @@ public class TabletServiceITCase {
tableId,
0,
Errors.INVALID_COLUMN_PROJECTION.code(),
- "Projected field id 2 is not contained in [0, 1]");
+ "Projected fields [2, 3] is out of bound for schema with 2
fields.");
}
@Test