danny0405 commented on code in PR #5830: URL: https://github.com/apache/hudi/pull/5830#discussion_r1023780152
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkInternalSchemaManager.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public final class FlinkInternalSchemaManager implements Serializable { + private static final long serialVersionUID = 1L; + + private final HoodieTableMetaClient metaClient; + private final InternalSchema querySchema; + + /** + * Creates the manager if schema evolution enabled. + */ + public static Option<FlinkInternalSchemaManager> of(Configuration conf) { + if (conf.getBoolean(FlinkOptions.SCHEMA_EVOLUTION_ENABLED)) { + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + return new TableSchemaResolver(metaClient) + .getTableInternalSchemaFromCommitMetadata() + .map(schema -> new FlinkInternalSchemaManager(metaClient, schema)); + } else { + return Option.empty(); + } + } + + FlinkInternalSchemaManager(HoodieTableMetaClient metaClient, InternalSchema querySchema) { + this.metaClient = metaClient; + this.querySchema = querySchema; + } + + /** + * Returns query schema as InternalSchema. + */ + public InternalSchema getQuerySchema() { + return querySchema; + } + + /** + * Returns schema of fileSplit. + */ + public InternalSchema getActualSchema(FileInputSplit fileSplit) { + return getActualSchema(FSUtils.getCommitTime(fileSplit.getPath().getName())); + } + + /** + * Returns schema of mor fileSplit. + */ + public InternalSchema getActualSchema(MergeOnReadInputSplit split) { + Option<String> basePath = split.getBasePath(); + String commitTime; + if (basePath.isPresent()) { + String name = new Path(basePath.get()).getName(); + commitTime = FSUtils.getCommitTime(name); + } else { + commitTime = split.getLatestCommit(); + } + return getActualSchema(commitTime); + } + + /** + * Returns list of field names in internalSchema. + */ + public List<String> getFieldNames(InternalSchema internalSchema) { + return internalSchema.columns().stream().map(Types.Field::name).collect(Collectors.toList()); + } + + /** + * Returns list of field types in internalSchema. + */ + public List<DataType> getFieldTypes(InternalSchema internalSchema) { + return AvroSchemaConverter.convertToDataType( + AvroInternalSchemaConverter.convert(internalSchema, getTableName())).getChildren(); + } + + /** + * Returns castMap based on conversions from actualSchema to querySchema. + */ + public CastMap getCastMap(InternalSchema querySchema, InternalSchema actualSchema) { + return CastMap.of(getTableName(), querySchema, actualSchema); + } + + /** + * Returns array of types positioned in fieldTypes according to selectedFields. + */ + public LogicalType[] project(List<DataType> fieldTypes, int[] selectedFields) { + return Arrays.stream(selectedFields) Review Comment: The method does not belong here, may put in `DataTypeUtils`. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java: ########## @@ -394,4 +416,22 @@ private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(org.apache.h } } + private void setActualFields(FileInputSplit fileSplit) { + FlinkInternalSchemaManager sm = schemaManager.get(); + InternalSchema actualSchema = sm.getActualSchema(fileSplit); + List<DataType> fieldTypes = sm.getFieldTypes(actualSchema); + CastMap castMap = sm.getCastMap(sm.getQuerySchema(), actualSchema); + int[] shiftedSelectedFields = Arrays.stream(selectedFields).map(pos -> pos + HOODIE_META_COLUMNS.size()).toArray(); + if (castMap.containsAnyPos(shiftedSelectedFields)) { Review Comment: Caution that the metadata field may also be queried. ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java: ########## @@ -286,4 +286,18 @@ public static Map<String, String> collectRenameCols(InternalSchema oldSchema, In return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1); })); } + + /** + * Returns whether passed types are the same. + * + * @param t1 first type + * @param t2 second type + * @return true if types are the same + */ + public static boolean isSameType(Type t1, Type t2) { + if (t1 instanceof Types.DecimalType && t2 instanceof Types.DecimalType) { + return t1.equals(t2); Review Comment: Can we implement the `Type#equals` correctly instead of add this tool ? ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkInternalSchemaManager.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public final class FlinkInternalSchemaManager implements Serializable { + private static final long serialVersionUID = 1L; + + private final HoodieTableMetaClient metaClient; + private final InternalSchema querySchema; + + /** + * Creates the manager if schema evolution enabled. + */ + public static Option<FlinkInternalSchemaManager> of(Configuration conf) { + if (conf.getBoolean(FlinkOptions.SCHEMA_EVOLUTION_ENABLED)) { + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + return new TableSchemaResolver(metaClient) + .getTableInternalSchemaFromCommitMetadata() + .map(schema -> new FlinkInternalSchemaManager(metaClient, schema)); + } else { + return Option.empty(); + } + } + + FlinkInternalSchemaManager(HoodieTableMetaClient metaClient, InternalSchema querySchema) { + this.metaClient = metaClient; + this.querySchema = querySchema; + } + + /** + * Returns query schema as InternalSchema. + */ + public InternalSchema getQuerySchema() { + return querySchema; + } + + /** + * Returns schema of fileSplit. + */ + public InternalSchema getActualSchema(FileInputSplit fileSplit) { + return getActualSchema(FSUtils.getCommitTime(fileSplit.getPath().getName())); + } + + /** + * Returns schema of mor fileSplit. + */ + public InternalSchema getActualSchema(MergeOnReadInputSplit split) { + Option<String> basePath = split.getBasePath(); + String commitTime; + if (basePath.isPresent()) { + String name = new Path(basePath.get()).getName(); + commitTime = FSUtils.getCommitTime(name); + } else { + commitTime = split.getLatestCommit(); Review Comment: How about the log files then ? Is the log file commit time can be considered here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
