wuchong commented on code in PR #2010:
URL: https://github.com/apache/fluss/pull/2010#discussion_r2571099751
##########
fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java:
##########
@@ -434,14 +398,130 @@ ByteBuffer getLogHeaderBuffer() {
return logHeaderBuffer;
}
+ private void setCurrentSchema(short schemaId) {
+ org.apache.fluss.metadata.Schema schema =
schemaGetter.getSchema(schemaId);
+ RowType rowType = schema.getRowType();
+ ProjectionKey projectionKey = new ProjectionKey(tableId, schemaId);
+ if (projectionsCache.containsKey(projectionKey)) {
+ // the schema and projection should identical for the same table
id.
+ currentProjection = projectionsCache.get(projectionKey);
+ if (!Arrays.equals(currentProjection.selectedFieldIds,
selectedFieldIds)
+ || !currentProjection.schema.equals(rowType)) {
+ throw new InvalidColumnProjectionException(
+ "The schema and projection should be identical for the
same table id.");
+ }
+ return;
+ }
+
+ int[] selectedFieldPositions = checkProjection(schema,
selectedFieldIds);
+
+ // initialize the projection util information
+ Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
+ BitSet selection = toBitSet(arrowSchema.getFields().size(),
selectedFieldPositions);
+ List<Tuple2<Field, Boolean>> flattenedFields = new ArrayList<>();
+ flattenFields(arrowSchema.getFields(), selection, flattenedFields);
+ int totalFieldNodes = flattenedFields.size();
+ int[] bufferLayoutCount = new int[totalFieldNodes];
+ BitSet nodesProjection = new BitSet(totalFieldNodes);
+ int totalBuffers = 0;
+ for (int i = 0; i < totalFieldNodes; i++) {
+ Field fieldNode = flattenedFields.get(i).f0;
+ boolean selected = flattenedFields.get(i).f1;
+ nodesProjection.set(i, selected);
+ bufferLayoutCount[i] =
TypeLayout.getTypeBufferCount(fieldNode.getType());
+ totalBuffers += bufferLayoutCount[i];
+ }
+ BitSet buffersProjection = new BitSet(totalBuffers);
+ int bufferIndex = 0;
+ for (int i = 0; i < totalFieldNodes; i++) {
+ if (nodesProjection.get(i)) {
+ buffersProjection.set(bufferIndex, bufferIndex +
bufferLayoutCount[i]);
+ }
+ bufferIndex += bufferLayoutCount[i];
+ }
+
+ Schema projectedArrowSchema =
+
ArrowUtils.toArrowSchema(rowType.project(selectedFieldPositions));
+ ArrowBodyCompression bodyCompression =
+
CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec());
+ int metadataLength =
+ ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema,
bodyCompression);
+ currentProjection =
+ new ProjectionInfo(
+ nodesProjection,
+ buffersProjection,
+ bufferIndex,
+ rowType,
+ metadataLength,
+ bodyCompression,
+ selectedFieldPositions,
+ selectedFieldIds);
+ projectionsCache.put(projectionKey, currentProjection);
+ }
+
+ int[] checkProjection(org.apache.fluss.metadata.Schema schema, int[]
projectedFields) {
+ Map<Integer, Integer> columnIdPositions = new HashMap<>();
+ List<Integer> columnIds = schema.getColumnIds();
+ for (int i = 0; i < columnIds.size(); i++) {
+ columnIdPositions.put(columnIds.get(i), i);
+ }
+
+ int prev = -1;
+ int[] selectedFieldPositions = new int[projectedFields.length];
+ for (int i = 0; i < projectedFields.length; i++) {
+ int fieldId = projectedFields[i];
+ Integer position = columnIdPositions.get(fieldId);
+ if (position == null) {
+ throw new InvalidColumnProjectionException(
+ String.format(
+ "Projected field id %s is not contains in %s",
fieldId, columnIds));
+ }
+
+ selectedFieldPositions[i] = position;
+ if (position < prev) {
+ throw new InvalidColumnProjectionException(
+ "The projection indexes should be in field order, but
is "
+ + Arrays.toString(projectedFields));
+ }
+
+ prev = position;
+ }
+ return selectedFieldPositions;
Review Comment:
We can remove this for now, as it may affect projection performance, and we
don't need the remapping, as we only support add column add end. We can rename
the method into `toSelectedFieldPositions()`.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -915,6 +918,32 @@ message PbAlterConfig {
required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
}
+message PbAddColumn{
+ required string column_name = 1;
+ required bytes data_type_json = 2;
+ optional string comment = 3;
+ required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
+}
+
+message PbDropColumn{
+ required string column_name = 1;
+}
+
+message PbRenameColumn{
+ required string old_column_name = 1;
+ required string new_column_name = 2;
+}
+
+
+message PbModifyColumn{
+ required string column_name = 1;
+ required bytes data_type_json = 2;
Review Comment:
should be `optional`. The following alter column is also valid.
```
ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is
kilobytes per second';
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java:
##########
@@ -231,4 +242,31 @@ private void processCreatePartition(
tablePath, tableId, partitionId, partitionName,
partitionAssignment));
}
}
+
+ private void processSchemaChange(TablePath tablePath, int schemaId) {
+
+ try {
+ int currentSchemaId =
zooKeeperClient.getCurrentSchemaId(tablePath);
+ SchemaInfo schemaInfo;
+ if (schemaId != currentSchemaId) {
+ LOG.warn(
+ "Schema id {} is not equal to current schema id {}.
Skipping schema change processing.",
+ schemaId,
+ currentSchemaId);
+ return;
+ }
Review Comment:
Why check current schema id? This is a heavy operation. Even if this is an
old schema, I think it is still fine to process the schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]