This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1b8e5ec12e300ed76c74fdddb585ede55c66b28a Author: Danny Chan <[email protected]> AuthorDate: Thu Dec 1 17:13:59 2022 +0800 [HUDI-5306] Unify RecordIterator and HoodieParquetReader with ClosableIterator (#7340) * Unify RecordIterator and HoodieParquetReader with ClosableIterator * Add a factory clazz for RecordIterator * Add more documents --- .../apache/hudi/configuration/OptionsResolver.java | 18 ++ .../java/org/apache/hudi/table/format/CastMap.java | 223 ++++++++++++++++ .../org/apache/hudi/table/format/FormatUtils.java | 28 ++ .../hudi/table/format/InternalSchemaManager.java | 170 ++++++++++++ .../table/format/ParquetSplitRecordIterator.java | 61 +++++ .../apache/hudi/table/format/RecordIterators.java | 91 +++++++ .../table/format/SchemaEvolvedRecordIterator.java | 52 ++++ .../table/format/cow/CopyOnWriteInputFormat.java | 27 +- .../table/format/mor/MergeOnReadInputFormat.java | 290 +++++++++++---------- .../apache/hudi/util/RowDataCastProjection.java | 49 ++++ .../org/apache/hudi/util/RowDataProjection.java | 9 +- 11 files changed, 867 insertions(+), 151 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 0dd31ee7538..029ac3c4027 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -202,6 +202,24 @@ public class OptionsResolver { return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT); } +<<<<<<< HEAD +======= + /** + * Returns the supplemental logging mode. + */ + public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) { + String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE); + return HoodieCDCSupplementalLoggingMode.parse(mode); + } + + /** + * Returns whether comprehensive schema evolution enabled. + */ + public static boolean isSchemaEvolutionEnabled(Configuration conf) { + return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()); + } + +>>>>>>> 07cc3e89a7 ([HUDI-5306] Unify RecordIterator and HoodieParquetReader with ClosableIterator (#7340)) // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java new file mode 100644 index 00000000000..36cf8708875 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.util.RowDataCastProjection; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nullable; +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +/** + * CastMap is responsible for conversion of flink types when full schema evolution enabled. + * + * <p>Supported cast conversions: + * <ul> + * <li>Integer => Long, Float, Double, Decimal, String</li> + * <li>Long => Float, Double, Decimal, String</li> + * <li>Float => Double, Decimal, String</li> + * <li>Double => Decimal, String</li> + * <li>Decimal => Decimal, String</li> + * <li>String => Decimal, Date</li> + * <li>Date => String</li> + * </ul> + */ +public final class CastMap implements Serializable { + + private static final long serialVersionUID = 1L; + + // Maps position to corresponding cast + private final Map<Integer, Cast> castMap = new HashMap<>(); + + private DataType[] fileFieldTypes; + + public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) { + if (castMap.isEmpty()) { + return Option.empty(); + } + LogicalType[] requiredType = new LogicalType[selectedFields.length]; + for (int i = 0; i < selectedFields.length; i++) { + requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType(); + } + return Option.of(new RowDataCastProjection(requiredType, this)); + } + + public Object castIfNeeded(int pos, Object val) { + Cast cast = castMap.get(pos); + if (cast == null) { + return val; + } + return cast.convert(val); + } + + public DataType[] getFileFieldTypes() { + return fileFieldTypes; + } + + public void setFileFieldTypes(DataType[] fileFieldTypes) { + this.fileFieldTypes = fileFieldTypes; + } + + @VisibleForTesting + void add(int pos, LogicalType fromType, LogicalType toType) { + Function<Object, Object> conversion = getConversion(fromType, toType); + if (conversion == null) { + throw new IllegalArgumentException(String.format("Cannot create cast %s => %s at pos %s", fromType, toType, pos)); + } + add(pos, new Cast(fromType, toType, conversion)); + } + + private @Nullable Function<Object, Object> getConversion(LogicalType fromType, LogicalType toType) { + LogicalTypeRoot from = fromType.getTypeRoot(); + LogicalTypeRoot to = toType.getTypeRoot(); + switch (to) { + case BIGINT: { + if (from == INTEGER) { + return val -> ((Number) val).longValue(); + } + break; + } + case FLOAT: { + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).floatValue(); + } + break; + } + case DOUBLE: { + if (from == INTEGER || from == BIGINT) { + return val -> ((Number) val).doubleValue(); + } + if (from == FLOAT) { + return val -> Double.parseDouble(val.toString()); + } + break; + } + case DECIMAL: { + if (from == INTEGER || from == BIGINT || from == DOUBLE) { + return val -> toDecimalData((Number) val, toType); + } + if (from == FLOAT) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + if (from == VARCHAR) { + return val -> toDecimalData(Double.parseDouble(val.toString()), toType); + } + if (from == DECIMAL) { + return val -> toDecimalData(((DecimalData) val).toBigDecimal(), toType); + } + break; + } + case VARCHAR: { + if (from == INTEGER + || from == BIGINT + || from == FLOAT + || from == DOUBLE + || from == DECIMAL) { + return val -> new BinaryStringData(String.valueOf(val)); + } + if (from == DATE) { + return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString()); + } + break; + } + case DATE: { + if (from == VARCHAR) { + return val -> (int) LocalDate.parse(val.toString()).toEpochDay(); + } + break; + } + default: + } + return null; + } + + private void add(int pos, Cast cast) { + castMap.put(pos, cast); + } + + private DecimalData toDecimalData(Number val, LogicalType decimalType) { + BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue()); + return toDecimalData(valAsDecimal, decimalType); + } + + private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) { + return DecimalData.fromBigDecimal( + valAsDecimal, + ((DecimalType) decimalType).getPrecision(), + ((DecimalType) decimalType).getScale()); + } + + /** + * Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link Cast#convert(Object)} determines conversion. + * However, it is convenient to debug {@link CastMap} when {@link Cast#toString()} prints types. + */ + private static final class Cast implements Serializable { + + private static final long serialVersionUID = 1L; + + private final LogicalType from; + private final LogicalType to; + private final Function<Object, Object> conversion; + + Cast(LogicalType from, LogicalType to, Function<Object, Object> conversion) { + this.from = from; + this.to = to; + this.conversion = conversion; + } + + Object convert(Object val) { + return conversion.apply(val); + } + + @Override + public String toString() { + return from + " => " + to; + } + } + + @Override + public String toString() { + return castMap.entrySet().stream() + .map(e -> e.getKey() + ": " + e.getValue()) + .collect(Collectors.joining(", ", "{", "}")); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 6357b898d49..49cb3cec5bf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -170,6 +171,33 @@ public class FormatUtils { .build(); } + public static HoodieMergedLogRecordScanner logScanner( + MergeOnReadInputSplit split, + Schema logSchema, + InternalSchema internalSchema, + org.apache.flink.configuration.Configuration flinkConf, + Configuration hadoopConf) { + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf); + FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) + .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) + .withLatestInstantTime(split.getLatestCommit()) + .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) + .withReverseReader(false) + .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) + .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) + .withInstantRange(split.getInstantRange()) + .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) + .build(); + } + /** * Utility to read and buffer the records in the unMerged log record scanner. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java new file mode 100644 index 00000000000..abd405469d8 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; +import org.apache.hudi.util.AvroSchemaConverter; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public class InternalSchemaManager implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final InternalSchemaManager DISABLED = new InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null); + + private final Configuration conf; + private final InternalSchema querySchema; + private final String validCommits; + private final String tablePath; + private transient org.apache.hadoop.conf.Configuration hadoopConf; + + public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClient metaClient) { + if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) { + return DISABLED; + } + Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata(); + if (!internalSchema.isPresent() || internalSchema.get().isEmptySchema()) { + return DISABLED; + } + String validCommits = metaClient + .getCommitsAndCompactionTimeline() + .filterCompletedInstants() + .getInstantsAsStream() + .map(HoodieInstant::getFileName) + .collect(Collectors.joining(",")); + return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePathV2().toString()); + } + + public InternalSchemaManager(Configuration conf, InternalSchema querySchema, String validCommits, String tablePath) { + this.conf = conf; + this.querySchema = querySchema; + this.validCommits = validCommits; + this.tablePath = tablePath; + } + + public InternalSchema getQuerySchema() { + return querySchema; + } + + InternalSchema getFileSchema(String fileName) { + if (querySchema.isEmptySchema()) { + return querySchema; + } + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName)); + InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId( + commitInstantTime, tablePath, getHadoopConf(), validCommits); + if (querySchema.equals(fileSchemaUnmerged)) { + return InternalSchema.getEmptyInternalSchema(); + } + return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema(); + } + + CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { + Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); + Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + + CastMap castMap = new CastMap(); + Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames); + if (posProxy.isEmpty()) { + castMap.setFileFieldTypes(queryFieldTypes); + return castMap; + } + List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); + List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( + AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren(); + DataType[] fileFieldTypes = new DataType[queryFieldTypes.length]; + for (int i = 0; i < queryFieldTypes.length; i++) { + Integer posOfChangedType = posProxy.get(i); + if (posOfChangedType == null) { + fileFieldTypes[i] = queryFieldTypes[i]; + } else { + DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType); + fileFieldTypes[i] = fileType; + int selectedPos = selectedFieldList.indexOf(i); + if (selectedPos != -1) { + castMap.add(selectedPos, fileType.getLogicalType(), queryFieldTypes[i].getLogicalType()); + } + } + } + castMap.setFileFieldTypes(fileFieldTypes); + return castMap; + } + + String[] getFileFieldNames(InternalSchema fileSchema, String[] queryFieldNames) { + Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); + Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + + Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(fileSchema, querySchema); + if (renamedCols.isEmpty()) { + return queryFieldNames; + } + return Arrays.stream(queryFieldNames).map(name -> renamedCols.getOrDefault(name, name)).toArray(String[]::new); + } + + private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, String[] queryFieldNames) { + Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema); + HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size()); + List<String> fieldNameList = Arrays.asList(queryFieldNames); + List<Types.Field> columns = querySchema.columns(); + changedCols.forEach((posInSchema, typePair) -> { + String name = columns.get(posInSchema).name(); + int posInType = fieldNameList.indexOf(name); + posProxy.put(posInType, posInSchema); + }); + return Collections.unmodifiableMap(posProxy); + } + + private org.apache.hadoop.conf.Configuration getHadoopConf() { + if (hadoopConf == null) { + hadoopConf = HadoopConfigurations.getHadoopConf(conf); + } + return hadoopConf; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java new file mode 100644 index 00000000000..7b26d71f115 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; + +import org.apache.flink.table.data.RowData; + +import java.io.IOException; + +/** + * Hoodie wrapper for flink parquet reader. + */ +public final class ParquetSplitRecordIterator implements ClosableIterator<RowData> { + private final ParquetColumnarRowSplitReader reader; + + public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) { + this.reader = reader; + } + + @Override + public boolean hasNext() { + try { + return !reader.reachedEnd(); + } catch (IOException e) { + throw new HoodieIOException("Decides whether the parquet columnar row split reader reached end exception", e); + } + } + + @Override + public RowData next() { + return reader.nextRecord(); + } + + @Override + public void close() { + try { + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Close the parquet columnar row split reader exception", e); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java new file mode 100644 index 00000000000..8657f16ddc9 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Map; + +/** + * Factory clazz for record iterators. + */ +public abstract class RecordIterators { + + public static ClosableIterator<RowData> getParquetRecordIterator( + InternalSchemaManager internalSchemaManager, + boolean utcTimestamp, + boolean caseSensitive, + Configuration conf, + String[] fieldNames, + DataType[] fieldTypes, + Map<String, Object> partitionSpec, + int[] selectedFields, + int batchSize, + Path path, + long splitStart, + long splitLength) throws IOException { + InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName()); + if (fileSchema.isEmptySchema()) { + return new ParquetSplitRecordIterator( + ParquetSplitReaderUtil.genPartColumnarRowReader( + utcTimestamp, + caseSensitive, + conf, + fieldNames, + fieldTypes, + partitionSpec, + selectedFields, + batchSize, + path, + splitStart, + splitLength)); + } else { + CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields); + Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields); + ClosableIterator<RowData> itr = new ParquetSplitRecordIterator( + ParquetSplitReaderUtil.genPartColumnarRowReader( + utcTimestamp, + caseSensitive, + conf, + internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), // the reconciled field names + castMap.getFileFieldTypes(), // the reconciled field types + partitionSpec, + selectedFields, + batchSize, + path, + splitStart, + splitLength)); + if (castProjection.isPresent()) { + return new SchemaEvolvedRecordIterator(itr, castProjection.get()); + } else { + return itr; + } + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java new file mode 100644 index 00000000000..739512c7b55 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format; + +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.table.data.RowData; + +/** + * Decorates origin record iterator with cast projection. + */ +public final class SchemaEvolvedRecordIterator implements ClosableIterator<RowData> { + private final ClosableIterator<RowData> nested; + private final RowDataProjection castProjection; + + public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested, RowDataProjection castProjection) { + this.nested = nested; + this.castProjection = castProjection; + } + + @Override + public boolean hasNext() { + return nested.hasNext(); + } + + @Override + public RowData next() { + return castProjection.project(nested.next()); + } + + @Override + public void close() { + nested.close(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index c5ea3d4ab98..820424549f0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -20,7 +20,9 @@ package org.apache.hudi.table.format.cow; import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; import org.apache.hudi.util.DataTypeUtils; import org.apache.flink.api.common.io.FileInputFormat; @@ -74,7 +76,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { private final SerializableConfiguration conf; private final long limit; - private transient ParquetColumnarRowSplitReader reader; + private transient ClosableIterator<RowData> itr; private transient long currentReadCount; /** @@ -82,6 +84,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { */ private FilePathFilter localFilesFilter = new GlobFilePathFilter(); + private final InternalSchemaManager internalSchemaManager; + public CopyOnWriteInputFormat( Path[] paths, String[] fullFieldNames, @@ -90,7 +94,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { String partDefaultName, long limit, Configuration conf, - boolean utcTimestamp) { + boolean utcTimestamp, + InternalSchemaManager internalSchemaManager) { super.setFilePaths(paths); this.limit = limit; this.partDefaultName = partDefaultName; @@ -99,6 +104,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { this.selectedFields = selectedFields; this.conf = new SerializableConfiguration(conf); this.utcTimestamp = utcTimestamp; + this.internalSchemaManager = internalSchemaManager; } @Override @@ -123,7 +129,8 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } }); - this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader( + this.itr = RecordIterators.getParquetRecordIterator( + internalSchemaManager, utcTimestamp, true, conf.conf(), @@ -270,26 +277,26 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } @Override - public boolean reachedEnd() throws IOException { + public boolean reachedEnd() { if (currentReadCount >= limit) { return true; } else { - return reader.reachedEnd(); + return !itr.hasNext(); } } @Override public RowData nextRecord(RowData reuse) { currentReadCount++; - return reader.nextRecord(); + return itr.next(); } @Override public void close() throws IOException { - if (reader != null) { - this.reader.close(); + if (itr != null) { + this.itr.close(); } - this.reader = null; + this.itr = null; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index c9b6561bdef..e1e7025ff07 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,11 +29,13 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; -import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; -import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; +import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.RowDataProjection; @@ -66,6 +68,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.IntStream; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; @@ -92,7 +95,7 @@ public class MergeOnReadInputFormat /** * Uniform iterator view for the underneath records. */ - private transient RecordIterator iterator; + private transient ClosableIterator<RowData> iterator; // for project push down /** @@ -137,13 +140,16 @@ public class MergeOnReadInputFormat */ private boolean closed = true; + private final InternalSchemaManager internalSchemaManager; + private MergeOnReadInputFormat( Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, long limit, - boolean emitDelete) { + boolean emitDelete, + InternalSchemaManager internalSchemaManager) { this.conf = conf; this.tableState = tableState; this.fieldNames = tableState.getRowType().getFieldNames(); @@ -154,6 +160,7 @@ public class MergeOnReadInputFormat this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; this.emitDelete = emitDelete; + this.internalSchemaManager = internalSchemaManager; } /** @@ -168,30 +175,35 @@ public class MergeOnReadInputFormat this.currentReadCount = 0L; this.closed = false; this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); + this.iterator = initIterator(split); + mayShiftInputSplit(split); + } + + protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException { if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { // base file only with commit time filtering - this.iterator = new BaseFileOnlyFilteringIterator( + return new BaseFileOnlyFilteringIterator( split.getInstantRange(), this.tableState.getRequiredRowType(), - getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); + getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); } else { // base file only - this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); + return getBaseFileIterator(split.getBasePath().get()); } } else if (!split.getBasePath().isPresent()) { // log files only if (OptionsResolver.emitChangelog(conf)) { - this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); + return new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); } else { - this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); + return new LogFileOnlyIterator(getLogFileIterator(split)); } } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) { - this.iterator = new SkipMergeIterator( - getRequiredSchemaReader(split.getBasePath().get()), + return new SkipMergeIterator( + getBaseFileIterator(split.getBasePath().get()), getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { - this.iterator = new MergeIterator( + return new MergeIterator( conf, hadoopConf, split, @@ -199,10 +211,11 @@ public class MergeOnReadInputFormat this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), + internalSchemaManager.getQuerySchema(), this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), - getFullSchemaReader(split.getBasePath().get())); + getBaseFileIteratorWithMetadata(split.getBasePath().get())); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -211,7 +224,6 @@ public class MergeOnReadInputFormat + "spark partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType()); } - mayShiftInputSplit(split); } @Override @@ -242,14 +254,14 @@ public class MergeOnReadInputFormat return true; } else { // log file reaches end ? - return this.iterator.reachedEnd(); + return !this.iterator.hasNext(); } } @Override public RowData nextRecord(RowData o) { currentReadCount++; - return this.iterator.nextRecord(); + return this.iterator.next(); } @Override @@ -284,15 +296,19 @@ public class MergeOnReadInputFormat } } - private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException { - return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); + protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String path) { + try { + return getBaseFileIterator(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); + } catch (IOException e) { + throw new HoodieException("Get reader error for path: " + path); + } } - private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException { - return getReader(path, this.requiredPos); + protected ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException { + return getBaseFileIterator(path, this.requiredPos); } - private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException { + private ClosableIterator<RowData> getBaseFileIterator(String path, int[] requiredPos) throws IOException { // generate partition specs. LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), @@ -314,7 +330,8 @@ public class MergeOnReadInputFormat } }); - return ParquetSplitReaderUtil.genPartColumnarRowReader( + return RecordIterators.getParquetRecordIterator( + internalSchemaManager, this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, hadoopConf), @@ -460,46 +477,12 @@ public class MergeOnReadInputFormat // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- - private interface RecordIterator { - boolean reachedEnd() throws IOException; - - RowData nextRecord(); - - void close() throws IOException; - } - - static class BaseFileOnlyIterator implements RecordIterator { - // base file reader - private final ParquetColumnarRowSplitReader reader; - - BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) { - this.reader = reader; - } - - @Override - public boolean reachedEnd() throws IOException { - return this.reader.reachedEnd(); - } - - @Override - public RowData nextRecord() { - return this.reader.nextRecord(); - } - - @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); - } - } - } - /** - * Similar with {@link BaseFileOnlyIterator} but with instant time filtering. + * Base record iterator with instant time filtering. */ - static class BaseFileOnlyFilteringIterator implements RecordIterator { - // base file reader - private final ParquetColumnarRowSplitReader reader; + static class BaseFileOnlyFilteringIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; private final InstantRange instantRange; private final RowDataProjection projection; @@ -508,44 +491,44 @@ public class MergeOnReadInputFormat BaseFileOnlyFilteringIterator( Option<InstantRange> instantRange, RowType requiredRowType, - ParquetColumnarRowSplitReader reader) { - this.reader = reader; + ClosableIterator<RowData> nested) { + this.nested = nested; this.instantRange = instantRange.orElse(null); int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); projection = RowDataProjection.instance(requiredRowType, positions); } @Override - public boolean reachedEnd() throws IOException { - while (!this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); + public boolean hasNext() { + while (this.nested.hasNext()) { + currentRecord = this.nested.next(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (isInRange) { - return false; + return true; } } else { - return false; + return true; } } - return true; + return false; } @Override - public RowData nextRecord() { + public RowData next() { // can promote: no need to project with null instant range return projection.project(currentRecord); } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } } } - static class LogFileOnlyIterator implements RecordIterator { + protected static class LogFileOnlyIterator implements ClosableIterator<RowData> { // iterator for log files private final ClosableIterator<RowData> iterator; @@ -554,12 +537,12 @@ public class MergeOnReadInputFormat } @Override - public boolean reachedEnd() { - return !this.iterator.hasNext(); + public boolean hasNext() { + return this.iterator.hasNext(); } @Override - public RowData nextRecord() { + public RowData next() { return this.iterator.next(); } @@ -571,9 +554,9 @@ public class MergeOnReadInputFormat } } - static class SkipMergeIterator implements RecordIterator { - // base file reader - private final ParquetColumnarRowSplitReader reader; + static class SkipMergeIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; // iterator for log files private final ClosableIterator<RowData> iterator; @@ -584,34 +567,34 @@ public class MergeOnReadInputFormat private RowData currentRecord; - SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) { - this.reader = reader; + SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) { + this.nested = nested; this.iterator = iterator; } @Override - public boolean reachedEnd() throws IOException { - if (!readLogs && !this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); - return false; + public boolean hasNext() { + if (!readLogs && this.nested.hasNext()) { + currentRecord = this.nested.next(); + return true; } readLogs = true; if (this.iterator.hasNext()) { currentRecord = this.iterator.next(); - return false; + return true; } - return true; + return false; } @Override - public RowData nextRecord() { + public RowData next() { return currentRecord; } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } if (this.iterator != null) { this.iterator.close(); @@ -619,24 +602,22 @@ public class MergeOnReadInputFormat } } - static class MergeIterator implements RecordIterator { - // base file reader - private final ParquetColumnarRowSplitReader reader; + protected static class MergeIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; // log keys used for merging private final Iterator<String> logKeysIterator; // scanner private final HoodieMergedLogRecordScanner scanner; private final Schema tableSchema; - private final Schema requiredSchema; - private final int[] requiredPos; private final boolean emitDelete; private final int operationPos; private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter; private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; - private final GenericRecordBuilder recordBuilder; - private final RowDataProjection projection; + private final Option<RowDataProjection> projection; + private final Option<Function<IndexedRecord, GenericRecord>> avroProjection; private final InstantRange instantRange; @@ -659,30 +640,49 @@ public class MergeOnReadInputFormat RowType requiredRowType, Schema tableSchema, Schema requiredSchema, + InternalSchema querySchema, int[] requiredPos, boolean emitDelete, int operationPos, - ParquetColumnarRowSplitReader reader) { // the reader should be with full schema + ClosableIterator<RowData> nested) { // the iterator should be with full schema + this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema, + querySchema, + Option.of(RowDataProjection.instance(requiredRowType, requiredPos)), + Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))), + emitDelete, operationPos, nested); + } + + public MergeIterator( + Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + MergeOnReadInputSplit split, + RowType tableRowType, + RowType requiredRowType, + Schema tableSchema, + InternalSchema querySchema, + Option<RowDataProjection> projection, + Option<Function<IndexedRecord, GenericRecord>> avroProjection, + boolean emitDelete, + int operationPos, + ClosableIterator<RowData> nested) { // the iterator should be with full schema this.tableSchema = tableSchema; - this.reader = reader; - this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf); + this.nested = nested; + this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, flinkConf, hadoopConf); this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps(); this.logKeysIterator = scanner.getRecords().keySet().iterator(); - this.requiredSchema = requiredSchema; - this.requiredPos = requiredPos; this.emitDelete = emitDelete; this.operationPos = operationPos; - this.recordBuilder = new GenericRecordBuilder(requiredSchema); + this.avroProjection = avroProjection; this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); - this.projection = RowDataProjection.instance(requiredRowType, requiredPos); + this.projection = projection; this.instantRange = split.getInstantRange().orElse(null); } @Override - public boolean reachedEnd() throws IOException { - while (!readLogs && !this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); + public boolean hasNext() { + while (!readLogs && this.nested.hasNext()) { + currentRecord = this.nested.next(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (!isInRange) { @@ -703,19 +703,19 @@ public class MergeOnReadInputFormat // deleted continue; } - GenericRecord avroRecord = buildAvroRecordBySchema( - mergedAvroRecord.get(), - requiredSchema, - requiredPos, - recordBuilder); + IndexedRecord avroRecord = avroProjection.isPresent() + ? avroProjection.get().apply(mergedAvroRecord.get()) + : mergedAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); this.currentRecord.setRowKind(rowKind); - return false; + return true; } } // project the full record in base with required positions - currentRecord = projection.project(currentRecord); - return false; + if (projection.isPresent()) { + currentRecord = projection.get().project(currentRecord); + } + return true; } // read the logs readLogs = true; @@ -725,49 +725,53 @@ public class MergeOnReadInputFormat Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey); if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping - GenericRecord avroRecord = buildAvroRecordBySchema( - insertAvroRecord.get(), - requiredSchema, - requiredPos, - recordBuilder); + IndexedRecord avroRecord = avroProjection.isPresent() + ? avroProjection.get().apply(insertAvroRecord.get()) + : insertAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos); - return false; + return true; } } } - return true; + return false; } - private Option<IndexedRecord> getInsertValue(String curKey) throws IOException { + private Option<IndexedRecord> getInsertValue(String curKey) { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { return Option.empty(); } - return record.getData().getInsertValue(tableSchema); + try { + return record.getData().getInsertValue(tableSchema); + } catch (IOException e) { + throw new HoodieIOException("Get insert value from payload exception", e); + } } @Override - public RowData nextRecord() { + public RowData next() { return currentRecord; } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } if (this.scanner != null) { this.scanner.close(); } } - private Option<IndexedRecord> mergeRowWithLog( - RowData curRow, - String curKey) throws IOException { + private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String curKey) { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); + try { + return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); + } catch (IOException e) { + throw new HoodieIOException("Merge base and delta payloads exception", e); + } } } @@ -775,12 +779,13 @@ public class MergeOnReadInputFormat * Builder for {@link MergeOnReadInputFormat}. */ public static class Builder { - private Configuration conf; - private MergeOnReadTableState tableState; - private List<DataType> fieldTypes; - private String defaultPartName; - private long limit = -1; - private boolean emitDelete = false; + protected Configuration conf; + protected MergeOnReadTableState tableState; + protected List<DataType> fieldTypes; + protected String defaultPartName; + protected long limit = -1; + protected boolean emitDelete = false; + protected InternalSchemaManager internalSchemaManager = InternalSchemaManager.DISABLED; public Builder config(Configuration conf) { this.conf = conf; @@ -812,9 +817,14 @@ public class MergeOnReadInputFormat return this; } + public Builder internalSchemaManager(InternalSchemaManager internalSchemaManager) { + this.internalSchemaManager = internalSchemaManager; + return this; + } + public MergeOnReadInputFormat build() { return new MergeOnReadInputFormat(conf, tableState, fieldTypes, - defaultPartName, limit, emitDelete); + defaultPartName, limit, emitDelete, internalSchemaManager); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java new file mode 100644 index 00000000000..55e85aa1f60 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.table.format.CastMap; + +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; +import java.util.stream.IntStream; + +/** + * This class is responsible to project row as well as {@link RowDataProjection}. + * In addition, fields are converted according to the CastMap. + */ +public final class RowDataCastProjection extends RowDataProjection { + private static final long serialVersionUID = 1L; + + private final CastMap castMap; + + public RowDataCastProjection(LogicalType[] types, CastMap castMap) { + super(types, IntStream.range(0, types.length).toArray()); + this.castMap = castMap; + } + + @Override + protected @Nullable Object getVal(int pos, @Nullable Object val) { + if (val == null) { + return null; + } + return castMap.castIfNeeded(pos, val); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 8076d982b99..2e3e8a2ed32 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -25,6 +25,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -37,7 +39,7 @@ public class RowDataProjection implements Serializable { private final RowData.FieldGetter[] fieldGetters; - private RowDataProjection(LogicalType[] types, int[] positions) { + protected RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; @@ -86,4 +88,9 @@ public class RowDataProjection implements Serializable { } return values; } + + protected @Nullable + Object getVal(int pos, @Nullable Object val) { + return val; + } }
