This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 60973dcf56fff61172f76cf5e8bc6d95970ea698 Author: Alexander Trushev <[email protected]> AuthorDate: Wed Nov 30 15:37:53 2022 +0700 [HUDI-3981] Flink engine support for comprehensive schema evolution (#5830) --- .../org/apache/hudi/table/HoodieFlinkTable.java | 13 + .../apache/hudi/configuration/OptionsResolver.java | 8 + .../org/apache/hudi/table/HoodieTableSource.java | 16 +- .../java/org/apache/hudi/table/format/CastMap.java | 220 ++++++++++ .../org/apache/hudi/table/format/FormatUtils.java | 8 +- .../format/HoodieParquetEvolvedSplitReader.java | 53 +++ .../hudi/table/format/HoodieParquetReader.java | 99 +++++ .../table/format/HoodieParquetSplitReader.java | 51 +++ .../hudi/table/format/InternalSchemaManager.java | 170 ++++++++ .../table/format/cow/CopyOnWriteInputFormat.java | 14 +- .../table/format/mor/MergeOnReadInputFormat.java | 180 +++++--- .../apache/hudi/util/RowDataCastProjection.java | 49 +++ .../org/apache/hudi/util/RowDataProjection.java | 9 +- .../apache/hudi/table/ITTestSchemaEvolution.java | 461 +++++++++++++++++++++ .../org/apache/hudi/table/format/TestCastMap.java | 120 ++++++ .../org/apache/hudi/utils/TestConfigurations.java | 23 + 16 files changed, 1436 insertions(+), 58 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 4e7dbe36c43..40a13c14d5b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -26,12 +26,15 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -57,6 +60,9 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload> .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); + if (config.getSchemaEvolutionEnable()) { + setLatestInternalSchema(config, metaClient); + } return HoodieFlinkTable.create(config, context, metaClient); } @@ -102,4 +108,11 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload> return Option.empty(); } } + + private static void setLatestInternalSchema(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { + Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata(); + if (internalSchema.isPresent()) { + config.setInternalSchemaString(SerDeHelper.toJson(internalSchema.get())); + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 0dd31ee7538..dd272e17fb0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -18,6 +18,7 @@ package org.apache.hudi.configuration; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.StringUtils; @@ -202,6 +203,13 @@ public class OptionsResolver { return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT); } + /** + * Returns whether comprehensive schema evolution enabled. + */ + public static boolean isSchemaEvolutionEnabled(Configuration conf) { + return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), false); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 4ea14c413cc..57c8b235a81 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -43,6 +43,7 @@ import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadOperator; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; @@ -123,6 +124,7 @@ public class HoodieTableSource implements private final String defaultPartName; private final Configuration conf; private final FileIndex fileIndex; + private final InternalSchemaManager internalSchemaManager; private int[] requiredPos; private long limit; @@ -135,7 +137,7 @@ public class HoodieTableSource implements List<String> partitionKeys, String defaultPartName, Configuration conf) { - this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null); + this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null, null); } public HoodieTableSource( @@ -148,7 +150,8 @@ public class HoodieTableSource implements @Nullable List<Map<String, String>> requiredPartitions, @Nullable int[] requiredPos, @Nullable Long limit, - @Nullable HoodieTableMetaClient metaClient) { + @Nullable HoodieTableMetaClient metaClient, + @Nullable InternalSchemaManager internalSchemaManager) { this.schema = schema; this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); this.path = path; @@ -166,6 +169,9 @@ public class HoodieTableSource implements this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient; this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); + this.internalSchemaManager = internalSchemaManager == null + ? InternalSchemaManager.get(this.conf, this.metaClient) + : internalSchemaManager; } @Override @@ -215,7 +221,7 @@ public class HoodieTableSource implements @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient); + conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient, internalSchemaManager); } @Override @@ -439,6 +445,7 @@ public class HoodieTableSource implements .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) .limit(this.limit) .emitDelete(emitDelete) + .internalSchemaManager(internalSchemaManager) .build(); } @@ -462,7 +469,8 @@ public class HoodieTableSource implements this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) + this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), + this.internalSchemaManager ); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java new file mode 100644 index 00000000000..5f29e85adc2 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.util.RowDataCastProjection; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nullable; +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +/** + * CastMap is responsible for conversion of flink types when full schema evolution enabled. + * Supported cast conversions: + * Integer => Long, Float, Double, Decimal, String + * Long => Float, Double, Decimal, String + * Float => Double, Decimal, String + * Double => Decimal, String + * Decimal => Decimal, String + * String => Decimal, Date + * Date => String + */ +public final class CastMap implements Serializable { + + private static final long serialVersionUID = 1L; + + // Maps position to corresponding cast + private final Map<Integer, Cast> castMap = new HashMap<>(); + + private DataType[] fileFieldTypes; + + public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) { + if (castMap.isEmpty()) { + return Option.empty(); + } + LogicalType[] requiredType = new LogicalType[selectedFields.length]; + for (int i = 0; i < selectedFields.length; i++) { + requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType(); + } + return Option.of(new RowDataCastProjection(requiredType, this)); + } + + public Object castIfNeeded(int pos, Object val) { + Cast cast = castMap.get(pos); + if (cast == null) { + return val; + } + return cast.convert(val); + } + + public DataType[] getFileFieldTypes() { + return fileFieldTypes; + } + + public void setFileFieldTypes(DataType[] fileFieldTypes) { + this.fileFieldTypes = fileFieldTypes; + } + + @VisibleForTesting + void add(int pos, LogicalType fromType, LogicalType toType) { + Function<Object, Object> conversion = getConversion(fromType, toType); + if (conversion == null) { + throw new IllegalArgumentException(String.format("Cannot create cast %s => %s at pos %s", fromType, toType, pos)); + } + add(pos, new Cast(fromType, toType, conversion)); + } + + private @Nullable Function<Object, Object> getConversion(LogicalType fromType, LogicalType toType) { + LogicalTypeRoot from = fromType.getTypeRoot(); + LogicalTypeRoot to = toType.getTypeRoot(); + switch (to) { + case BIGINT: { + if (from == INTEGER) { + return val -> ((Number) val).longValue(); + } + break; + } + case FLOAT: { + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).floatValue(); + } + break; + } + case DOUBLE: { + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).doubleValue(); + } + if (from == FLOAT) { + return val -> Double.parseDouble(val.toString()); + } + break; + } + case DECIMAL: { + if (from == INTEGER || from == BIGINT || from == DOUBLE) { + return val -> toDecimalData((Number) val, toType); + } + if (from == FLOAT) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + if (from == VARCHAR) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + if (from == DECIMAL) { + return val -> toDecimalData(((DecimalData) val).toBigDecimal(), toType); + } + break; + } + case VARCHAR: { + if (from == INTEGER + || from == BIGINT + || from == FLOAT + || from == DOUBLE + || from == DECIMAL) { + return val -> new BinaryStringData(String.valueOf(val)); + } + if (from == DATE) { + return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString()); + } + break; + } + case DATE: { + if (from == VARCHAR) { + return val -> (int) LocalDate.parse(val.toString()).toEpochDay(); + } + break; + } + default: + } + return null; + } + + private void add(int pos, Cast cast) { + castMap.put(pos, cast); + } + + private DecimalData toDecimalData(Number val, LogicalType decimalType) { + BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue()); + return toDecimalData(valAsDecimal, decimalType); + } + + private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) { + return DecimalData.fromBigDecimal( + valAsDecimal, + ((DecimalType) decimalType).getPrecision(), + ((DecimalType) decimalType).getScale()); + } + + /** + * Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link Cast#convert(Object)} determines conversion. + * However, it is convenient to debug {@link CastMap} when {@link Cast#toString()} prints types. + */ + private static final class Cast implements Serializable { + + private static final long serialVersionUID = 1L; + + private final LogicalType from; + private final LogicalType to; + private final Function<Object, Object> conversion; + + Cast(LogicalType from, LogicalType to, Function<Object, Object> conversion) { + this.from = from; + this.to = to; + this.conversion = conversion; + } + + Object convert(Object val) { + return conversion.apply(val); + } + + @Override + public String toString() { + return from + " => " + to; + } + } + + @Override + public String toString() { + return castMap.entrySet().stream() + .map(e -> e.getKey() + ": " + e.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 6357b898d49..672acf2b430 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -122,6 +123,7 @@ public class FormatUtils { public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf) { HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf); @@ -131,6 +133,7 @@ public class FormatUtils { .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) @@ -147,6 +150,7 @@ public class FormatUtils { private static HoodieUnMergedLogRecordScanner unMergedLogScanner( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { @@ -156,6 +160,7 @@ public class FormatUtils { .withBasePath(split.getTablePath()) .withLogFilePaths(split.getLogPaths().get()) .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( string2Boolean( @@ -186,6 +191,7 @@ public class FormatUtils { public BoundedMemoryRecords( MergeOnReadInputSplit split, Schema logSchema, + InternalSchema internalSchema, Configuration hadoopConf, org.apache.flink.configuration.Configuration flinkConf) { this.executor = new BoundedInMemoryExecutor<>( @@ -197,7 +203,7 @@ public class FormatUtils { Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf, + this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, internalSchema, flinkConf, hadoopConf, record -> executor.getQueue().insertRecord(record)); // Start reading and buffering this.executor.startProducers(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java new file mode 100644 index 00000000000..037a3776359 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.table.data.RowData; + +import java.io.IOException; + +/** + * Decorates origin hoodie parquet reader with cast projection. + */ +public final class HoodieParquetEvolvedSplitReader implements HoodieParquetReader { + private final HoodieParquetReader originReader; + private final RowDataProjection castProjection; + + public HoodieParquetEvolvedSplitReader(HoodieParquetReader originReader, RowDataProjection castProjection) { + this.originReader = originReader; + this.castProjection = castProjection; + } + + @Override + public boolean reachedEnd() throws IOException { + return originReader.reachedEnd(); + } + + @Override + public RowData nextRecord() { + return castProjection.project(originReader.nextRecord()); + } + + @Override + public void close() throws IOException { + originReader.close(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java new file mode 100644 index 00000000000..e762f03e983 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import org.apache.hadoop.conf.Configuration; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +/** + * Base interface for hoodie parquet readers. + */ +public interface HoodieParquetReader extends Closeable { + + boolean reachedEnd() throws IOException; + + RowData nextRecord(); + + static HoodieParquetReader getReader( + InternalSchemaManager internalSchemaManager, + boolean utcTimestamp, + boolean caseSensitive, + Configuration conf, + String[] fieldNames, + DataType[] fieldTypes, + Map<String, Object> partitionSpec, + int[] selectedFields, + int batchSize, + Path path, + long splitStart, + long splitLength) throws IOException { + Option<RowDataProjection> castProjection; + InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName()); + if (fileSchema.isEmptySchema()) { + return new HoodieParquetSplitReader( + ParquetSplitReaderUtil.genPartColumnarRowReader( + utcTimestamp, + caseSensitive, + conf, + fieldNames, + fieldTypes, + partitionSpec, + selectedFields, + batchSize, + path, + splitStart, + splitLength)); + } else { + CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields); + castProjection = castMap.toRowDataProjection(selectedFields); + fieldNames = internalSchemaManager.getFileFieldNames(fileSchema, fieldNames); + fieldTypes = castMap.getFileFieldTypes(); + } + HoodieParquetReader reader = new HoodieParquetSplitReader( + ParquetSplitReaderUtil.genPartColumnarRowReader( + utcTimestamp, + caseSensitive, + conf, + fieldNames, + fieldTypes, + partitionSpec, + selectedFields, + batchSize, + path, + splitStart, + splitLength)); + if (castProjection.isPresent()) { + return new HoodieParquetEvolvedSplitReader(reader, castProjection.get()); + } else { + return reader; + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java new file mode 100644 index 00000000000..d13c6c7c21a --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; + +import org.apache.flink.table.data.RowData; + +import java.io.IOException; + +/** + * Hoodie wrapper for flink parquet reader. + */ +public final class HoodieParquetSplitReader implements HoodieParquetReader { + private final ParquetColumnarRowSplitReader reader; + + public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) { + this.reader = reader; + } + + @Override + public boolean reachedEnd() throws IOException { + return reader.reachedEnd(); + } + + @Override + public RowData nextRecord() { + return reader.nextRecord(); + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java new file mode 100644 index 00000000000..abd405469d8 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.util.AvroSchemaConverter; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public class InternalSchemaManager implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final InternalSchemaManager DISABLED = new InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null); + + private final Configuration conf; + private final InternalSchema querySchema; + private final String validCommits; + private final String tablePath; + private transient org.apache.hadoop.conf.Configuration hadoopConf; + + public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClient metaClient) { + if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) { + return DISABLED; + } + Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata(); + if (!internalSchema.isPresent() || internalSchema.get().isEmptySchema()) { + return DISABLED; + } + String validCommits = metaClient + .getCommitsAndCompactionTimeline() + .filterCompletedInstants() + .getInstantsAsStream() + .map(HoodieInstant::getFileName) + .collect(Collectors.joining(",")); + return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePathV2().toString()); + } + + public InternalSchemaManager(Configuration conf, InternalSchema querySchema, String validCommits, String tablePath) { + this.conf = conf; + this.querySchema = querySchema; + this.validCommits = validCommits; + this.tablePath = tablePath; + } + + public InternalSchema getQuerySchema() { + return querySchema; + } + + InternalSchema getFileSchema(String fileName) { + if (querySchema.isEmptySchema()) { + return querySchema; + } + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName)); + InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId( + commitInstantTime, tablePath, getHadoopConf(), validCommits); + if (querySchema.equals(fileSchemaUnmerged)) { + return InternalSchema.getEmptyInternalSchema(); + } + return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema(); + } + + CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { + Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); + Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + + CastMap castMap = new CastMap(); + Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames); + if (posProxy.isEmpty()) { + castMap.setFileFieldTypes(queryFieldTypes); + return castMap; + } + List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); + List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( + AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren(); + DataType[] fileFieldTypes = new DataType[queryFieldTypes.length]; + for (int i = 0; i < queryFieldTypes.length; i++) { + Integer posOfChangedType = posProxy.get(i); + if (posOfChangedType == null) { + fileFieldTypes[i] = queryFieldTypes[i]; + } else { + DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType); + fileFieldTypes[i] = fileType; + int selectedPos = selectedFieldList.indexOf(i); + if (selectedPos != -1) { + castMap.add(selectedPos, fileType.getLogicalType(), queryFieldTypes[i].getLogicalType()); + } + } + } + castMap.setFileFieldTypes(fileFieldTypes); + return castMap; + } + + String[] getFileFieldNames(InternalSchema fileSchema, String[] queryFieldNames) { + Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); + Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + + Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(fileSchema, querySchema); + if (renamedCols.isEmpty()) { + return queryFieldNames; + } + return Arrays.stream(queryFieldNames).map(name -> renamedCols.getOrDefault(name, name)).toArray(String[]::new); + } + + private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, String[] queryFieldNames) { + Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema); + HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size()); + List<String> fieldNameList = Arrays.asList(queryFieldNames); + List<Types.Field> columns = querySchema.columns(); + changedCols.forEach((posInSchema, typePair) -> { + String name = columns.get(posInSchema).name(); + int posInType = fieldNameList.indexOf(name); + posProxy.put(posInType, posInSchema); + }); + return Collections.unmodifiableMap(posProxy); + } + + private org.apache.hadoop.conf.Configuration getHadoopConf() { + if (hadoopConf == null) { + hadoopConf = HadoopConfigurations.getHadoopConf(conf); + } + return hadoopConf; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index c5ea3d4ab98..453d0fee232 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -20,7 +20,8 @@ package org.apache.hudi.table.format.cow; import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; +import org.apache.hudi.table.format.HoodieParquetReader; +import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.util.DataTypeUtils; import org.apache.flink.api.common.io.FileInputFormat; @@ -74,7 +75,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { private final SerializableConfiguration conf; private final long limit; - private transient ParquetColumnarRowSplitReader reader; + private transient HoodieParquetReader reader; private transient long currentReadCount; /** @@ -82,6 +83,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { */ private FilePathFilter localFilesFilter = new GlobFilePathFilter(); + private final InternalSchemaManager internalSchemaManager; + public CopyOnWriteInputFormat( Path[] paths, String[] fullFieldNames, @@ -90,7 +93,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { String partDefaultName, long limit, Configuration conf, - boolean utcTimestamp) { + boolean utcTimestamp, + InternalSchemaManager internalSchemaManager) { super.setFilePaths(paths); this.limit = limit; this.partDefaultName = partDefaultName; @@ -99,6 +103,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { this.selectedFields = selectedFields; this.conf = new SerializableConfiguration(conf); this.utcTimestamp = utcTimestamp; + this.internalSchemaManager = internalSchemaManager; } @Override @@ -123,7 +128,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } }); - this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader( + this.reader = HoodieParquetReader.getReader( + internalSchemaManager, utcTimestamp, true, conf.conf(), diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index c9b6561bdef..3103e27e857 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,11 +29,12 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; -import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; -import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; +import org.apache.hudi.table.format.HoodieParquetReader; +import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.RowDataProjection; @@ -66,6 +67,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.IntStream; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; @@ -137,13 +139,16 @@ public class MergeOnReadInputFormat */ private boolean closed = true; - private MergeOnReadInputFormat( + private final InternalSchemaManager internalSchemaManager; + + protected MergeOnReadInputFormat( Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, long limit, - boolean emitDelete) { + boolean emitDelete, + InternalSchemaManager internalSchemaManager) { this.conf = conf; this.tableState = tableState; this.fieldNames = tableState.getRowType().getFieldNames(); @@ -154,6 +159,7 @@ public class MergeOnReadInputFormat this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; this.emitDelete = emitDelete; + this.internalSchemaManager = internalSchemaManager; } /** @@ -199,6 +205,7 @@ public class MergeOnReadInputFormat this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), + internalSchemaManager.getQuerySchema(), this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), @@ -284,15 +291,19 @@ public class MergeOnReadInputFormat } } - private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException { - return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); + protected HoodieParquetReader getFullSchemaReader(String path) { + try { + return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); + } catch (IOException e) { + throw new HoodieException("Get reader error for path: " + path); + } } - private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException { + protected HoodieParquetReader getRequiredSchemaReader(String path) throws IOException { return getReader(path, this.requiredPos); } - private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException { + private HoodieParquetReader getReader(String path, int[] requiredPos) throws IOException { // generate partition specs. LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), @@ -314,7 +325,8 @@ public class MergeOnReadInputFormat } }); - return ParquetSplitReaderUtil.genPartColumnarRowReader( + return HoodieParquetReader.getReader( + internalSchemaManager, this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, hadoopConf), @@ -334,7 +346,7 @@ public class MergeOnReadInputFormat final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf); final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); final int[] pkOffset = tableState.getPkOffsetsInRequired(); // flag saying whether the pk semantics has been dropped by user specified @@ -414,7 +426,7 @@ public class MergeOnReadInputFormat final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf, conf); + final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, internalSchemaManager.getQuerySchema(), hadoopConf, conf); final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator(); return new ClosableIterator<RowData>() { @@ -457,6 +469,59 @@ public class MergeOnReadInputFormat }; } + protected ClosableIterator<RowData> getFullLogFileIterator(MergeOnReadInputSplit split) { + final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); + final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = + AvroToRowDataConverters.createRowConverter(tableState.getRowType()); + final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, InternalSchema.getEmptyInternalSchema(), conf, hadoopConf); + final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator(); + + return new ClosableIterator<RowData>() { + private RowData currentRecord; + + @Override + public boolean hasNext() { + while (logRecordsKeyIterator.hasNext()) { + String curAvroKey = logRecordsKeyIterator.next(); + Option<IndexedRecord> curAvroRecord = null; + final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) scanner.getRecords().get(curAvroKey); + try { + curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema); + } catch (IOException e) { + throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e); + } + if (curAvroRecord.isPresent()) { + final IndexedRecord avroRecord = curAvroRecord.get(); + final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos()); + if (rowKind == RowKind.DELETE) { + // skip the delete record + continue; + } + currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); + currentRecord.setRowKind(rowKind); + return true; + } + // else: + // delete record found + // skipping if the condition is unsatisfied + // continue; + + } + return false; + } + + @Override + public RowData next() { + return currentRecord; + } + + @Override + public void close() { + scanner.close(); + } + }; + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -470,9 +535,9 @@ public class MergeOnReadInputFormat static class BaseFileOnlyIterator implements RecordIterator { // base file reader - private final ParquetColumnarRowSplitReader reader; + private final HoodieParquetReader reader; - BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) { + public BaseFileOnlyIterator(HoodieParquetReader reader) { this.reader = reader; } @@ -499,7 +564,7 @@ public class MergeOnReadInputFormat */ static class BaseFileOnlyFilteringIterator implements RecordIterator { // base file reader - private final ParquetColumnarRowSplitReader reader; + private final HoodieParquetReader reader; private final InstantRange instantRange; private final RowDataProjection projection; @@ -508,7 +573,7 @@ public class MergeOnReadInputFormat BaseFileOnlyFilteringIterator( Option<InstantRange> instantRange, RowType requiredRowType, - ParquetColumnarRowSplitReader reader) { + HoodieParquetReader reader) { this.reader = reader; this.instantRange = instantRange.orElse(null); int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); @@ -573,7 +638,7 @@ public class MergeOnReadInputFormat static class SkipMergeIterator implements RecordIterator { // base file reader - private final ParquetColumnarRowSplitReader reader; + private final HoodieParquetReader reader; // iterator for log files private final ClosableIterator<RowData> iterator; @@ -584,7 +649,7 @@ public class MergeOnReadInputFormat private RowData currentRecord; - SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) { + SkipMergeIterator(HoodieParquetReader reader, ClosableIterator<RowData> iterator) { this.reader = reader; this.iterator = iterator; } @@ -621,22 +686,20 @@ public class MergeOnReadInputFormat static class MergeIterator implements RecordIterator { // base file reader - private final ParquetColumnarRowSplitReader reader; + private final HoodieParquetReader reader; // log keys used for merging private final Iterator<String> logKeysIterator; // scanner private final HoodieMergedLogRecordScanner scanner; private final Schema tableSchema; - private final Schema requiredSchema; - private final int[] requiredPos; private final boolean emitDelete; private final int operationPos; private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter; private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; - private final GenericRecordBuilder recordBuilder; - private final RowDataProjection projection; + private final Option<RowDataProjection> projection; + private final Option<Function<IndexedRecord, GenericRecord>> avroProjection; private final InstantRange instantRange; @@ -651,7 +714,7 @@ public class MergeOnReadInputFormat private RowData currentRecord; - MergeIterator( + public MergeIterator( Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, @@ -659,23 +722,42 @@ public class MergeOnReadInputFormat RowType requiredRowType, Schema tableSchema, Schema requiredSchema, + InternalSchema querySchema, int[] requiredPos, boolean emitDelete, int operationPos, - ParquetColumnarRowSplitReader reader) { // the reader should be with full schema + HoodieParquetReader reader) { // the reader should be with full schema + this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema, + querySchema, + Option.of(RowDataProjection.instance(requiredRowType, requiredPos)), + Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))), + emitDelete, operationPos, reader); + } + + public MergeIterator( + Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + MergeOnReadInputSplit split, + RowType tableRowType, + RowType requiredRowType, + Schema tableSchema, + InternalSchema querySchema, + Option<RowDataProjection> projection, + Option<Function<IndexedRecord, GenericRecord>> avroProjection, + boolean emitDelete, + int operationPos, + HoodieParquetReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf); + this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, flinkConf, hadoopConf); this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps(); this.logKeysIterator = scanner.getRecords().keySet().iterator(); - this.requiredSchema = requiredSchema; - this.requiredPos = requiredPos; this.emitDelete = emitDelete; this.operationPos = operationPos; - this.recordBuilder = new GenericRecordBuilder(requiredSchema); + this.avroProjection = avroProjection; this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); - this.projection = RowDataProjection.instance(requiredRowType, requiredPos); + this.projection = projection; this.instantRange = split.getInstantRange().orElse(null); } @@ -703,18 +785,18 @@ public class MergeOnReadInputFormat // deleted continue; } - GenericRecord avroRecord = buildAvroRecordBySchema( - mergedAvroRecord.get(), - requiredSchema, - requiredPos, - recordBuilder); + IndexedRecord avroRecord = avroProjection.isPresent() + ? avroProjection.get().apply(mergedAvroRecord.get()) + : mergedAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); this.currentRecord.setRowKind(rowKind); return false; } } // project the full record in base with required positions - currentRecord = projection.project(currentRecord); + if (projection.isPresent()) { + currentRecord = projection.get().project(currentRecord); + } return false; } // read the logs @@ -725,11 +807,9 @@ public class MergeOnReadInputFormat Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey); if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping - GenericRecord avroRecord = buildAvroRecordBySchema( - insertAvroRecord.get(), - requiredSchema, - requiredPos, - recordBuilder); + IndexedRecord avroRecord = avroProjection.isPresent() + ? avroProjection.get().apply(insertAvroRecord.get()) + : insertAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos); return false; @@ -775,12 +855,13 @@ public class MergeOnReadInputFormat * Builder for {@link MergeOnReadInputFormat}. */ public static class Builder { - private Configuration conf; - private MergeOnReadTableState tableState; - private List<DataType> fieldTypes; - private String defaultPartName; - private long limit = -1; - private boolean emitDelete = false; + protected Configuration conf; + protected MergeOnReadTableState tableState; + protected List<DataType> fieldTypes; + protected String defaultPartName; + protected long limit = -1; + protected boolean emitDelete = false; + protected InternalSchemaManager internalSchemaManager = InternalSchemaManager.DISABLED; public Builder config(Configuration conf) { this.conf = conf; @@ -812,9 +893,14 @@ public class MergeOnReadInputFormat return this; } + public Builder internalSchemaManager(InternalSchemaManager internalSchemaManager) { + this.internalSchemaManager = internalSchemaManager; + return this; + } + public MergeOnReadInputFormat build() { return new MergeOnReadInputFormat(conf, tableState, fieldTypes, - defaultPartName, limit, emitDelete); + defaultPartName, limit, emitDelete, internalSchemaManager); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java new file mode 100644 index 00000000000..55e85aa1f60 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.table.format.CastMap; + +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; +import java.util.stream.IntStream; + +/** + * This class is responsible to project row as well as {@link RowDataProjection}. + * In addition, fields are converted according to the CastMap. + */ +public final class RowDataCastProjection extends RowDataProjection { + private static final long serialVersionUID = 1L; + + private final CastMap castMap; + + public RowDataCastProjection(LogicalType[] types, CastMap castMap) { + super(types, IntStream.range(0, types.length).toArray()); + this.castMap = castMap; + } + + @Override + protected @Nullable Object getVal(int pos, @Nullable Object val) { + if (val == null) { + return null; + } + return castMap.castIfNeeded(pos, val); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 8076d982b99..d6604159579 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -25,6 +25,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -37,7 +38,7 @@ public class RowDataProjection implements Serializable { private final RowData.FieldGetter[] fieldGetters; - private RowDataProjection(LogicalType[] types, int[] positions) { + protected RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; @@ -70,7 +71,7 @@ public class RowDataProjection implements Serializable { GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length); for (int i = 0; i < this.fieldGetters.length; i++) { final Object val = this.fieldGetters[i].getFieldOrNull(rowData); - genericRowData.setField(i, val); + genericRowData.setField(i, getVal(i, val)); } return genericRowData; } @@ -86,4 +87,8 @@ public class RowDataProjection implements Serializable { } return values; } + + protected @Nullable Object getVal(int pos, @Nullable Object val) { + return val; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java new file mode 100644 index 00000000000..3a668514674 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.utils.FlinkMiniCluster; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.AFTER; +import static org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.BEFORE; +import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_AFTER; +import static org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_BEFORE; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"}) +@ExtendWith(FlinkMiniCluster.class) +public class ITTestSchemaEvolution { + + @TempDir File tempFile; + private StreamTableEnvironment tEnv; + + @BeforeEach + public void setUp() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); + tEnv = StreamTableEnvironment.create(env); + } + + @Test + public void testCopyOnWriteInputFormat() throws Exception { + testSchemaEvolution(defaultTableOptions(tempFile.getAbsolutePath())); + } + + @Test + public void testMergeOnReadInputFormatBaseFileOnlyIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.READ_AS_STREAMING.key(), true) + .withOption(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST); + testSchemaEvolution(tableOptions); + } + + @Test + public void testMergeOnReadInputFormatBaseFileOnlyFilteringIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.READ_AS_STREAMING.key(), true) + .withOption(FlinkOptions.READ_START_COMMIT.key(), 1); + testSchemaEvolution(tableOptions); + } + + @Test + public void testMergeOnReadInputFormatLogFileOnlyIteratorGetLogFileIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + testSchemaEvolution(tableOptions); + } + + @Test + public void testMergeOnReadInputFormatLogFileOnlyIteratorGetUnMergedLogFileIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .withOption(FlinkOptions.READ_AS_STREAMING.key(), true) + .withOption(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST) + .withOption(FlinkOptions.CHANGELOG_ENABLED.key(), true); + testSchemaEvolution(tableOptions, false, EXPECTED_UNMERGED_RESULT); + } + + @Test + public void testMergeOnReadInputFormatMergeIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1); + testSchemaEvolution(tableOptions, true); + } + + @Test + public void testMergeOnReadInputFormatSkipMergeIterator() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1) + .withOption(FlinkOptions.MERGE_TYPE.key(), FlinkOptions.REALTIME_SKIP_MERGE); + testSchemaEvolution(tableOptions, true, EXPECTED_UNMERGED_RESULT); + } + + @Test + public void testCompaction() throws Exception { + TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath()) + .withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1); + testSchemaEvolution(tableOptions); + try (HoodieFlinkWriteClient<?> writeClient = FlinkWriteClients.createWriteClient(tableOptions.toConfig())) { + Option<String> compactionInstant = writeClient.scheduleCompaction(Option.empty()); + writeClient.compact(compactionInstant.get()); + } + checkAnswerEvolved(EXPECTED_MERGED_RESULT.evolvedRows); + } + + private void testSchemaEvolution(TableOptions tableOptions) throws Exception { + testSchemaEvolution(tableOptions, false); + } + + private void testSchemaEvolution(TableOptions tableOptions, boolean shouldCompact) throws Exception { + testSchemaEvolution(tableOptions, shouldCompact, EXPECTED_MERGED_RESULT); + } + + private void testSchemaEvolution(TableOptions tableOptions, boolean shouldCompact, ExpectedResult expectedResult) throws Exception { + writeTableWithSchema1(tableOptions); + changeTableSchema(tableOptions, shouldCompact); + writeTableWithSchema2(tableOptions); + checkAnswerEvolved(expectedResult.evolvedRows); + checkAnswerCount(expectedResult.rowCount); + checkAnswerWithMeta(tableOptions, expectedResult.rowsWithMeta); + } + + private void writeTableWithSchema1(TableOptions tableOptions) throws ExecutionException, InterruptedException { + //language=SQL + tEnv.executeSql("" + + "create table t1 (" + + " uuid string," + + " name string," + + " gender char," + + " age int," + + " ts timestamp," + + " `partition` string" + + ") partitioned by (`partition`) with (" + tableOptions + ")" + ); + //language=SQL + tEnv.executeSql("" + + "insert into t1 select " + + " cast(uuid as string)," + + " cast(name as string)," + + " cast(gender as char)," + + " cast(age as int)," + + " cast(ts as timestamp)," + + " cast(`partition` as string) " + + "from (values " + + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', 'par1')," + + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', 'par1')," + + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', 'par2')," + + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', 'par2')," + + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', 'par3')," + + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', 'par3')," + + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', 'par4')," + + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', 'par4')" + + ") as A(uuid, name, gender, age, ts, `partition`)" + ).await(); + } + + private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactBeforeSchemaChanges) throws IOException { + try (HoodieFlinkWriteClient<?> writeClient = FlinkWriteClients.createWriteClient(tableOptions.toConfig())) { + if (shouldCompactBeforeSchemaChanges) { + Option<String> compactionInstant = writeClient.scheduleCompaction(Option.empty()); + writeClient.compact(compactionInstant.get()); + } + Schema doubleType = SchemaBuilder.unionOf().nullType().and().doubleType().endUnion(); + Schema stringType = SchemaBuilder.unionOf().nullType().and().stringType().endUnion(); + writeClient.addColumn("salary", doubleType, null, "name", AFTER); + writeClient.deleteColumns("gender"); + writeClient.renameColumn("name", "first_name"); + writeClient.updateColumnType("age", Types.StringType.get()); + writeClient.addColumn("last_name", stringType, "empty allowed", "salary", BEFORE); + writeClient.reOrderColPosition("age", "first_name", BEFORE); + } + } + + private void writeTableWithSchema2(TableOptions tableOptions) throws ExecutionException, InterruptedException { + tableOptions.withOption( + FlinkOptions.SOURCE_AVRO_SCHEMA.key(), + AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, "hoodie.t1.t1_record")); + + //language=SQL + tEnv.executeSql("drop table t1"); + //language=SQL + tEnv.executeSql("" + + "create table t1 (" + + " uuid string," + + " age string," + + " first_name string," + + " last_name string," + + " salary double," + + " ts timestamp," + + " `partition` string" + + ") partitioned by (`partition`) with (" + tableOptions + ")" + ); + //language=SQL + tEnv.executeSql("" + + "insert into t1 select " + + " cast(uuid as string)," + + " cast(age as string)," + + " cast(first_name as string)," + + " cast(last_name as string)," + + " cast(salary as double)," + + " cast(ts as timestamp)," + + " cast(`partition` as string) " + + "from (values " + + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', 'par1')," + + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 'par1')," + + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 'par2')" + + ") as A(uuid, age, first_name, last_name, salary, ts, `partition`)" + ).await(); + } + + private TableOptions defaultTableOptions(String tablePath) { + return new TableOptions( + FactoryUtil.CONNECTOR.key(), HoodieTableFactory.FACTORY_ID, + FlinkOptions.PATH.key(), tablePath, + FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_COPY_ON_WRITE, + HoodieTableConfig.NAME.key(), "t1", + FlinkOptions.READ_AS_STREAMING.key(), false, + FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_SNAPSHOT, + KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid", + KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition", + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true, + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName(), + FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // each record triggers flush + FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE), + FlinkOptions.READ_TASKS.key(), 1, + FlinkOptions.WRITE_TASKS.key(), 1, + FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1, + FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 1, + FlinkOptions.COMPACTION_TASKS.key(), 1, + FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false, + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true); + } + + private void checkAnswerEvolved(String... expectedResult) throws Exception { + //language=SQL + checkAnswer("select first_name, salary, age from t1", expectedResult); + } + + private void checkAnswerCount(String... expectedResult) throws Exception { + //language=SQL + checkAnswer("select count(*) from t1", expectedResult); + } + + private void checkAnswerWithMeta(TableOptions tableOptions, String... expectedResult) throws Exception { + //language=SQL + tEnv.executeSql("drop table t1"); + //language=SQL + tEnv.executeSql("" + + "create table t1 (" + + " `_hoodie_commit_time` string," + + " `_hoodie_commit_seqno` string," + + " `_hoodie_record_key` string," + + " `_hoodie_partition_path` string," + + " `_hoodie_file_name` string," + + " uuid string," + + " age string," + + " first_name string," + + " last_name string," + + " salary double," + + " ts timestamp," + + " `partition` string" + + ") partitioned by (`partition`) with (" + tableOptions + ")" + ); + //language=SQL + checkAnswer("select `_hoodie_record_key`, first_name, salary from t1", expectedResult); + } + + private void checkAnswer(String query, String... expectedResult) throws Exception { + TableResult actualResult = tEnv.executeSql(query); + Set<String> expected = new HashSet<>(Arrays.asList(expectedResult)); + Set<String> actual = new HashSet<>(expected.size()); + try (CloseableIterator<Row> iterator = actualResult.collect()) { + for (int i = 0; i < expected.size() && iterator.hasNext(); i++) { + actual.add(iterator.next().toString()); + } + } + assertEquals(expected, actual); + } + + private static final class TableOptions { + private final Map<String, String> map = new HashMap<>(); + + TableOptions(Object... options) { + Preconditions.checkArgument(options.length % 2 == 0); + for (int i = 0; i < options.length; i += 2) { + withOption(options[i].toString(), options[i + 1]); + } + } + + TableOptions withOption(String optionName, Object optionValue) { + if (StringUtils.isNullOrEmpty(optionName)) { + throw new IllegalArgumentException("optionName must be presented"); + } + map.put(optionName, optionValue.toString()); + return this; + } + + Configuration toConfig() { + return FlinkOptions.fromMap(map); + } + + @Override + public String toString() { + return map.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", ")); + } + } + + private static final class ExpectedResult { + final String[] evolvedRows; + final String[] rowsWithMeta; + final String[] rowCount; + + private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, String[] rowCount) { + this.evolvedRows = evolvedRows; + this.rowsWithMeta = rowsWithMeta; + this.rowCount = rowCount; + } + } + + private static final ExpectedResult EXPECTED_MERGED_RESULT = new ExpectedResult( + new String[] { + "+I[Danny, 10000.1, 23]", + "+I[Stephen, null, 33]", + "+I[Julian, 30000.3, 53]", + "+I[Fabian, null, 31]", + "+I[Sophia, null, 18]", + "+I[Emma, null, 20]", + "+I[Bob, null, 44]", + "+I[Han, null, 56]", + "+I[Alice, 90000.9, unknown]", + }, + new String[] { + "+I[uuid:id1, Danny, 10000.1]", + "+I[uuid:id2, Stephen, null]", + "+I[uuid:id3, Julian, 30000.3]", + "+I[uuid:id4, Fabian, null]", + "+I[uuid:id5, Sophia, null]", + "+I[uuid:id6, Emma, null]", + "+I[uuid:id7, Bob, null]", + "+I[uuid:id8, Han, null]", + "+I[uuid:id9, Alice, 90000.9]", + }, + new String[] { + "+I[1]", + "-U[1]", + "+U[2]", + "-U[2]", + "+U[3]", + "-U[3]", + "+U[4]", + "-U[4]", + "+U[5]", + "-U[5]", + "+U[6]", + "-U[6]", + "+U[7]", + "-U[7]", + "+U[8]", + "-U[8]", + "+U[9]", + } + ); + + private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new ExpectedResult( + new String[] { + "+I[Danny, null, 23]", + "+I[Stephen, null, 33]", + "+I[Julian, null, 53]", + "+I[Fabian, null, 31]", + "+I[Sophia, null, 18]", + "+I[Emma, null, 20]", + "+I[Bob, null, 44]", + "+I[Han, null, 56]", + "+I[Alice, 90000.9, unknown]", + "+I[Danny, 10000.1, 23]", + "+I[Julian, 30000.3, 53]", + }, + new String[] { + "+I[uuid:id1, Danny, null]", + "+I[uuid:id2, Stephen, null]", + "+I[uuid:id3, Julian, null]", + "+I[uuid:id4, Fabian, null]", + "+I[uuid:id5, Sophia, null]", + "+I[uuid:id6, Emma, null]", + "+I[uuid:id7, Bob, null]", + "+I[uuid:id8, Han, null]", + "+I[uuid:id9, Alice, 90000.9]", + "+I[uuid:id1, Danny, 10000.1]", + "+I[uuid:id3, Julian, 30000.3]", + }, + new String[] { + "+I[1]", + "-U[1]", + "+U[2]", + "-U[2]", + "+U[3]", + "-U[3]", + "+U[4]", + "-U[4]", + "+U[5]", + "-U[5]", + "+U[6]", + "-U[6]", + "+U[7]", + "-U[7]", + "+U[8]", + "-U[8]", + "+U[9]", + "-U[9]", + "+U[10]", + "-U[10]", + "+U[11]", + } + ); +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java new file mode 100644 index 00000000000..bcefea5b1cf --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link CastMap}. + */ +public class TestCastMap { + + @Test + public void testCastInt() { + CastMap castMap = new CastMap(); + castMap.add(0, new IntType(), new BigIntType()); + castMap.add(1, new IntType(), new FloatType()); + castMap.add(2, new IntType(), new DoubleType()); + castMap.add(3, new IntType(), new DecimalType()); + castMap.add(4, new IntType(), new VarCharType()); + int val = 1; + assertEquals(1L, castMap.castIfNeeded(0, val)); + assertEquals(1.0F, castMap.castIfNeeded(1, val)); + assertEquals(1.0, castMap.castIfNeeded(2, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(3, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(4, val)); + } + + @Test + public void testCastLong() { + CastMap castMap = new CastMap(); + castMap.add(0, new BigIntType(), new FloatType()); + castMap.add(1, new BigIntType(), new DoubleType()); + castMap.add(2, new BigIntType(), new DecimalType()); + castMap.add(3, new BigIntType(), new VarCharType()); + long val = 1L; + assertEquals(1.0F, castMap.castIfNeeded(0, val)); + assertEquals(1.0, castMap.castIfNeeded(1, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(2, val)); + assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(3, val)); + } + + @Test + public void testCastFloat() { + CastMap castMap = new CastMap(); + castMap.add(0, new FloatType(), new DoubleType()); + castMap.add(1, new FloatType(), new DecimalType()); + castMap.add(2, new FloatType(), new VarCharType()); + float val = 1F; + assertEquals(1.0, castMap.castIfNeeded(0, val)); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(1, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(2, val)); + } + + @Test + public void testCastDouble() { + CastMap castMap = new CastMap(); + castMap.add(0, new DoubleType(), new DecimalType()); + castMap.add(1, new DoubleType(), new VarCharType()); + double val = 1; + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, val)); + } + + @Test + public void testCastDecimal() { + CastMap castMap = new CastMap(); + castMap.add(0, new DecimalType(2, 1), new DecimalType(3, 2)); + castMap.add(1, new DecimalType(), new VarCharType()); + DecimalData val = DecimalData.fromBigDecimal(BigDecimal.ONE, 2, 1); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 3, 2), castMap.castIfNeeded(0, val)); + assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, val)); + } + + @Test + public void testCastString() { + CastMap castMap = new CastMap(); + castMap.add(0, new VarCharType(), new DecimalType()); + castMap.add(1, new VarCharType(), new DateType()); + assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), castMap.castIfNeeded(0, BinaryStringData.fromString("1.0"))); + assertEquals((int) LocalDate.parse("2022-05-12").toEpochDay(), castMap.castIfNeeded(1, BinaryStringData.fromString("2022-05-12"))); + } + + @Test + public void testCastDate() { + CastMap castMap = new CastMap(); + castMap.add(0, new DateType(), new VarCharType()); + assertEquals(BinaryStringData.fromString("2022-05-12"), castMap.castIfNeeded(0, (int) LocalDate.parse("2022-05-12").toEpochDay())); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index a5b7e368a88..bff2553df81 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -84,6 +84,29 @@ public class TestConfigurations { public static final RowType ROW_TYPE_DATE = (RowType) ROW_DATA_TYPE_DATE.getLogicalType(); + public static final DataType ROW_DATA_TYPE_EVOLUTION_BEFORE = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)), + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("gender", DataTypes.CHAR(1)), // removed field + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + public static final RowType ROW_TYPE_EVOLUTION_BEFORE = (RowType) ROW_DATA_TYPE_EVOLUTION_BEFORE.getLogicalType(); + + public static final DataType ROW_DATA_TYPE_EVOLUTION_AFTER = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)), + DataTypes.FIELD("age", DataTypes.VARCHAR(10)), // changed type, reordered + DataTypes.FIELD("first_name", DataTypes.VARCHAR(10)), // renamed + DataTypes.FIELD("last_name", DataTypes.VARCHAR(10)), // new field + DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + public static final RowType ROW_TYPE_EVOLUTION_AFTER = (RowType) ROW_DATA_TYPE_EVOLUTION_AFTER.getLogicalType(); + public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) { return getCreateHoodieTableDDL(tableName, options, true, "partition"); }
