This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit e715276f23a517c8640a267c85d0aba0e040c874 Author: Jark Wu <[email protected]> AuthorDate: Sun Nov 30 21:14:28 2025 +0800 WIP --- .../org/apache/fluss/client/table/FlussTable.java | 6 +--- .../client/table/scanner/batch/BatchScanUtils.java | 3 +- .../table/scanner/batch/LimitBatchScanner.java | 22 ++++++++++--- .../table/scanner/batch/SnapshotFilesReader.java | 19 +++++++++-- .../client/table/scanner/log/BucketScanStatus.java | 3 +- .../fluss/client/table/scanner/log/LogFetcher.java | 3 +- .../client/table/writer/AbstractTableWriter.java | 2 -- .../fluss/client/write/RecordAccumulator.java | 3 +- .../java/org/apache/fluss/client/write/Sender.java | 6 ++-- .../fluss/client/admin/FlussAdminITCase.java | 37 ++++++++++++++++++++-- .../batch/KvSnapshotBatchScannerITCase.java | 4 +-- .../table/scanner/log/LogFetchCollectorTest.java | 3 +- .../client/table/scanner/log/LogFetcherTest.java | 1 + .../fluss/client/write/RecordAccumulatorTest.java | 1 - .../org/apache/fluss/exception/ApiException.java | 16 +++++++++- .../fluss/exception/SchemaChangeException.java | 4 +++ .../java/org/apache/fluss/metadata/Schema.java | 1 + .../org/apache/fluss/metadata/TableChange.java | 2 +- .../java/org/apache/fluss/metadata/TableInfo.java | 5 +++ .../java/org/apache/fluss/utils/Projection.java | 6 +--- .../java/org/apache/fluss/utils/SchemaUtil.java | 21 ++++++++---- .../server/metadata/SchemaMetadataManager.java | 1 + 22 files changed, 122 insertions(+), 47 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java index 629abe6ab..8532f2a85 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java @@ -29,7 +29,6 @@ import org.apache.fluss.client.table.writer.TableAppend; import org.apache.fluss.client.table.writer.TableUpsert; import org.apache.fluss.client.table.writer.Upsert; import org.apache.fluss.metadata.SchemaGetter; -import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -55,10 +54,7 @@ public class FlussTable implements Table { this.tableInfo = tableInfo; this.hasPrimaryKey = tableInfo.hasPrimaryKey(); this.schemaGetter = - new ClientSchemaGetter( - tableInfo.getTablePath(), - new SchemaInfo(tableInfo.getSchema(), tableInfo.getSchemaId()), - conn.getAdmin()); + new ClientSchemaGetter(tablePath, tableInfo.getSchemaInfo(), conn.getAdmin()); } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java index f4e150f30..d36bf9a7d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java @@ -71,8 +71,7 @@ public class BatchScanUtils { // If the scanner has more data, add it back to the queue scannerQueue.add(scanner); } else { - // Close the scanner if it has no more data, and not add it back to the - // queue + // Close the scanner if it has no more data, and not add it back to the queue scanner.close(); } } catch (Exception e) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java index e6208f368..06c79fce8 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java @@ -39,6 +39,7 @@ import org.apache.fluss.rpc.messages.LimitScanRequest; import org.apache.fluss.rpc.messages.LimitScanResponse; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.SchemaUtil; import javax.annotation.Nullable; @@ -47,7 +48,9 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,6 +67,12 @@ public class LimitBatchScanner implements BatchScanner { private final KvFormat kvFormat; private final int targetSchemaId; + /** + * A cache for schema projection mapping from source schema to target. Use HashMap here, because + * LimitBatchScanner is used in single thread only. + */ + private final Map<Short, int[]> schemaProjectionCache = new HashMap<>(); + private boolean endOfInput; public LimitBatchScanner( @@ -142,11 +151,14 @@ public class LimitBatchScanner implements BatchScanner { for (ValueRecord record : valueRecords.records(readContext)) { InternalRow row = record.getRow(); if (targetSchemaId != record.schemaId()) { - row = - ProjectedRow.from( - schemaGetter.getSchema(record.schemaId()), - schemaGetter.getSchema(targetSchemaId)) - .replaceRow(row); + int[] indexMapping = + schemaProjectionCache.computeIfAbsent( + record.schemaId(), + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), + schemaGetter.getSchema(targetSchemaId))); + row = ProjectedRow.from(indexMapping).replaceRow(row); } scanRows.add(maybeProject(row)); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java index 10e773609..a7c5e2969 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java @@ -30,6 +30,7 @@ import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.CloseableRegistry; import org.apache.fluss.utils.IOUtils; +import org.apache.fluss.utils.SchemaUtil; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -43,6 +44,8 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; /** * A reader to read kv snapshot files to {@link ScanRecord}s. It will return the {@link ScanRecord}s @@ -58,6 +61,12 @@ class SnapshotFilesReader implements CloseableIterator<InternalRow> { @Nullable private final int[] projectedFields; private RocksIteratorWrapper rocksIteratorWrapper; + /** + * A cache for schema projection mapping from source schema to target. Use HashMap here, because + * SnapshotFilesReader is used in single thread only. + */ + private final Map<Short, int[]> schemaProjectionCache = new HashMap<>(); + private Snapshot snapshot; private RocksDBHandle rocksDBHandle; private boolean isClose = false; @@ -158,9 +167,13 @@ class SnapshotFilesReader implements CloseableIterator<InternalRow> { ValueDecoder.Value originValue = valueDecoder.decodeValue(value); InternalRow originRow = originValue.row; if (targetSchemaId != originValue.schemaId) { - originRow = - ProjectedRow.from(schemaGetter.getSchema(originValue.schemaId), targetSchema) - .replaceRow(originRow); + int[] indexMapping = + schemaProjectionCache.computeIfAbsent( + originValue.schemaId, + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), targetSchema)); + originRow = ProjectedRow.from(indexMapping).replaceRow(originRow); } if (projectedFields != null) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java index bee0910e1..738e04a49 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java @@ -24,8 +24,7 @@ import org.apache.fluss.annotation.Internal; class BucketScanStatus { private long offset; // last consumed position private long highWatermark; // the high watermark from last fetch - // TODO add - // resetStrategy and nextAllowedRetryTimeMs. + // TODO add resetStrategy and nextAllowedRetryTimeMs. public BucketScanStatus() { this.offset = 0L; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index e8067dd4c..b19347772 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -364,8 +364,7 @@ public class LogFetcher implements Closeable { if (!MemoryLogRecords.EMPTY.equals(logRecords) || fetchResultForBucket.getErrorCode() != Errors.NONE.code()) { // In oder to not signal notEmptyCondition, add completed - // fetch to - // buffer until log records is not empty. + // fetch to buffer until log records is not empty. DefaultCompletedFetch completedFetch = new DefaultCompletedFetch( tb, diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java index f1811dfa2..98a93fdd2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java @@ -37,13 +37,11 @@ public abstract class AbstractTableWriter implements TableWriter { protected final TablePath tablePath; protected final WriterClient writerClient; protected final int fieldCount; - protected TableInfo tableInfo; private final @Nullable PartitionGetter partitionFieldGetter; protected AbstractTableWriter( TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) { this.tablePath = tablePath; - this.tableInfo = tableInfo; this.writerClient = writerClient; this.fieldCount = tableInfo.getRowType().getFieldCount(); this.partitionFieldGetter = diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index f7d7d3ef0..3a282ba92 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -868,8 +868,7 @@ public final class RecordAccumulator { // Either we have reached a point where there are batches without a sequence (i.e. never // been drained and are hence in order by default), or the batch at the front of the // queue has a sequence greater than the incoming batch. This is the right place to - // add - // the incoming batch. + // add the incoming batch. deque.addFirst(batch); // Now we have to re-insert the previously queued batches in the right order. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index 1b9c79397..e9c285306 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -130,7 +130,7 @@ public class Sender implements Runnable { this.idempotenceManager = idempotenceManager; this.writerMetricGroup = writerMetricGroup; - // TODOadd retry logic while send failed. See FLUSS-56364375 + // TODO add retry logic while send failed. See FLUSS-56364375 } @VisibleForTesting @@ -167,7 +167,7 @@ public class Sender implements Runnable { } } - // TODO if force close failed,add logic to abort incomplete batches. + // TODO if force close failed, add logic to abort incomplete batches. LOG.debug("Shutdown of Fluss write sender I/O thread has completed."); } @@ -245,7 +245,7 @@ public class Sender implements Runnable { if (!batches.isEmpty()) { addToInflightBatches(batches); - // TODOadd logic for batch expire. + // TODO add logic for batch expire. sendWriteRequests(batches); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 05f196fdd..bd7954284 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -354,13 +354,43 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .get()) .hasMessageContaining("Column c1 must be nullable"); + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "c1", + DataTypes.STRING().copy(false), + null, + TableChange.ColumnPosition + .first())), + false) + .get()) + .hasMessageContaining("Unsupported ColumnPositionType: FIRST"); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "c1", + DataTypes.STRING().copy(false), + null, + TableChange.ColumnPosition.after( + "name"))), + false) + .get()) + .hasMessageContaining("Unsupported ColumnPositionType: AFTER(name)"); + admin.alterTable( tablePath, Collections.singletonList( TableChange.addColumn( "c1", DataTypes.STRING(), - null, + "new column c1", TableChange.ColumnPosition.last())), false) .get(); @@ -380,7 +410,10 @@ class FlussAdminITCase extends ClientToServerITCaseBase { new Schema.Column( "age", DataTypes.INT(), "person age", (short) 2), new Schema.Column( - "c1", DataTypes.STRING(), null, (short) 3))) + "c1", + DataTypes.STRING(), + "new column c1", + (short) 3))) .build(); SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2)); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java index 527994777..62b5bdc13 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java @@ -134,7 +134,7 @@ class KvSnapshotBatchScannerITCase extends ClientToServerITCaseBase { Collections.singletonList( TableChange.addColumn( "new_column", - DataTypes.BIGINT().copy(false).copy(true), + DataTypes.BIGINT().copy(true), null, TableChange.ColumnPosition.last())), false) @@ -148,7 +148,7 @@ class KvSnapshotBatchScannerITCase extends ClientToServerITCaseBase { .column("name", DataTypes.STRING()) .column("new_column", DataTypes.BIGINT()) .build(); - // put into values with old schema. + // put into values with new schema. List<InternalRow> rows = new ArrayList<>(); for (int i = 10; i < 20; i++) { InternalRow row = diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index 83878a816..99a108e2f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -93,8 +93,7 @@ public class LogFetchCollectorTest { logFetchBuffer.add(completedFetch); assertThat(logFetchBuffer.isEmpty()).isFalse(); - // Validate that the completed fetch isn't initialized just because we add it to the - // buffer. + // Validate that the completed fetch isn't initialized just because we add it to the buffer assertThat(completedFetch.isInitialized()).isFalse(); // Fetch the data and validate that we get all the records we want back. diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java index 03f908733..e5a8c8ff0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java @@ -199,6 +199,7 @@ public class LogFetcherTest extends ClientToServerITCaseBase { scanRecords = records.get(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); + newSchemaLogFetcher.close(); } @Test diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index bc4e53b50..47a9c2414 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -137,7 +137,6 @@ class RecordAccumulatorTest { conf = new Configuration(); // init cluster. cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3)); - // todo: 从cluster中获取schema getter schemaGetter = new TestingSchemaGetter(new SchemaInfo(DATA1_SCHEMA, (short) 1)); } diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java index 14b0b7f96..b577b8633 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java @@ -25,21 +25,35 @@ public class ApiException extends FlussRuntimeException { private static final long serialVersionUID = 1L; + private final boolean stackTraceEnabled; + + public ApiException(String message, Throwable cause, boolean stackTraceEnabled) { + super(message, cause); + this.stackTraceEnabled = stackTraceEnabled; + } + public ApiException(String message, Throwable cause) { super(message, cause); + this.stackTraceEnabled = false; } public ApiException(String message) { super(message); + this.stackTraceEnabled = false; } public ApiException(Throwable cause) { super(cause); + this.stackTraceEnabled = false; } /* avoid the expensive and useless stack trace for api exceptions */ @Override public Throwable fillInStackTrace() { - return this; + if (stackTraceEnabled) { + return super.fillInStackTrace(); + } else { + return this; + } } } diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java index 8c9446682..77983e441 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java @@ -27,4 +27,8 @@ public class SchemaChangeException extends ApiException { public SchemaChangeException(String message) { this(message, null); } + + public SchemaChangeException(String message, boolean stackTraceEnabled) { + super(message, null, stackTraceEnabled); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 367013363..c065a3711 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -132,6 +132,7 @@ public final class Schema implements Serializable { return columns.stream().map(Column::getName).collect(Collectors.toList()); } + /** Returns all column ids for top-level columns, the nested field ids are not included. */ public List<Integer> getColumnIds() { return columns.stream().map(Column::getColumnId).collect(Collectors.toList()); } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java index 36162f30c..e012a38f6 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java @@ -402,7 +402,7 @@ public interface TableChange { @Override public String toString() { - return String.format("AFTER %s", columnName); + return String.format("AFTER(%s)", columnName); } } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index af293f120..eba41e078 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -137,6 +137,11 @@ public final class TableInfo { return schema; } + /** Returns the schema info of the table, including schema and schema id. */ + public SchemaInfo getSchemaInfo() { + return new SchemaInfo(schema, schemaId); + } + /** * Returns the row type of the table. The row type is the schema of the table, which defines the * columns and types of the table. diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java index c597caf81..7c392b055 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java @@ -112,11 +112,7 @@ public class Projection { return projectionPositions; } - /** - * The id of the projection, which is used for fluss server scan. - * - * @return - */ + /** The id of the projection, which is used for fluss server scan. */ public int[] getProjectionIdInOrder() { return projectionIdsInOrder; } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java b/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java index 6b66c0cd6..1815e71cd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java @@ -39,6 +39,7 @@ public class SchemaUtil { * @param expectedSchema * @return */ + // TODO public static int[] getTargetColumns( int[] targetColumns, Schema originSchema, Schema expectedSchema) { if (targetColumns == null) { @@ -57,12 +58,17 @@ public class SchemaUtil { } /** - * Get the index of each value of expectedSchema from originSchema. + * Get the index mapping from origin schema to expected schema. This mapping can be used to + * convert the row with origin schema to the row with expected schema via {@code + * ProjectedRow.from(getIndexMapping(inputSchema, targetSchema)).replace(inputRow)}. * - * @param originSchema - * @param expectedSchema - * @return indexMapping the value of i-th means the expectedSchema's i-th column is the - * originSchema's indexMapping[i]-th column. + * @param originSchema The origin schema. + * @param expectedSchema The expected schema. + * @return The index mapping array. The length of the array is the number of columns in the + * expected schema. Each element in the array is the index of the corresponding column in + * the origin schema. If a column in the expected schema does not exist in the origin + * schema, the corresponding element is UNEXIST_MAPPING (-1). + * @throws SchemaChangeException if there is a datatype mismatch between the two schemas. */ public static int[] getIndexMapping(Schema originSchema, Schema expectedSchema) { List<Schema.Column> originColumns = originSchema.getColumns(); @@ -84,7 +90,7 @@ public class SchemaUtil { if (originColumn != null && !Objects.equals( expectedColumn.getDataType().copy(true), - originColumns.get(indexMapping[i]).getDataType().copy(true))) { + originColumn.getDataType().copy(true))) { throw new SchemaChangeException( String.format( @@ -92,7 +98,8 @@ public class SchemaUtil { expectedColumn.getColumnId(), expectedColumn.getName(), expectedColumn.getDataType(), - originColumn.getDataType())); + originColumn.getDataType()), + true); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java index 8b6be5e39..6bed8dfe3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; * 1. latest schema of each subscribed table, updated by UpdateMetadata request. 2. history schemas * of each table, updated by lookup from tablet server. */ +// TODO SchemaManager public class SchemaMetadataManager { private MetadataManager metadataManager;
