cshuo commented on code in PR #13078:
URL: https://github.com/apache/hudi/pull/13078#discussion_r2029889715
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -138,6 +139,15 @@ private FlinkOptions() {
+ "These merger impls will filter by record.merger.strategy. "
+ "Hudi will pick most efficient implementation to perform
merging/combining of the records (during update, reading MOR table, etc)");
+ @AdvancedConfig
+ public static final ConfigOption<String> RECORD_MERGE_MODE = ConfigOptions
Review Comment:
Currently, users can configure merging strategy by the option
`payload.class` (default value is `EventTimeAvroPayload` for event time merging
semantics), which is Avro-based.
After we introduce FG reader based compaction, users should not use the
legacy config based on Avro payload, and the merging mode configs should be
exposed to users to choose the expected merging behavior.
Btw, the compatibility work for payload config is also included in in the PR.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/FlinkRowDataReaderContext.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.io.storage.row.RowDataFileReader;
+import org.apache.hudi.io.storage.row.RowDataFileReaderFactories;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.expression.ExpressionPredicates.Predicate;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieRowDataUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Implementation of {@link HoodieReaderContext} to read {@link RowData}s from
base files or
+ * log files with Flink parquet reader.
+ */
+public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
+ private final List<Predicate> predicates;
+ private final InternalSchemaManager internalSchemaManager;
+ private RowDataSerializer rowDataSerializer;
+ private final Configuration flinkConf;
+
+ public FlinkRowDataReaderContext(
+ Configuration conf,
+ InternalSchemaManager internalSchemaManager,
+ List<Predicate> predicates) {
+ this.flinkConf = conf;
+ this.internalSchemaManager = internalSchemaManager;
+ this.predicates = predicates;
+ }
+
+ @Override
+ public ClosableIterator<RowData> getFileRecordIterator(
+ StoragePath filePath,
+ long start,
+ long length,
+ Schema dataSchema,
+ Schema requiredSchema,
+ HoodieStorage storage) throws IOException {
+ RowDataFileReaderFactories.Factory readerFactory =
RowDataFileReaderFactories.getFactory(HoodieFileFormat.PARQUET);
+ RowDataFileReader fileReader =
readerFactory.createFileReader(internalSchemaManager, flinkConf);
+
+ List<String> fieldNames =
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<DataType> fieldTypes = dataSchema.getFields().stream().map(
+ f ->
AvroSchemaConverter.convertToDataType(f.schema())).collect(Collectors.toList());
+ int[] selectedFields =
requiredSchema.getFields().stream().map(Schema.Field::name)
+ .map(fieldNames::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+
+ return fileReader.getRowDataIterator(
+ fieldNames,
+ fieldTypes,
+ selectedFields,
+ predicates,
+ filePath,
+ start,
+ length);
+ }
+
+ @Override
+ public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
+ switch (mergeMode) {
+ case EVENT_TIME_ORDERING:
+ return Option.of(new EventTimeFlinkRecordMerger());
+ case COMMIT_TIME_ORDERING:
+ return Option.of(new CommitTimeFlinkRecordMerger());
+ default:
+ Option<HoodieRecordMerger> mergerClass =
+ HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK,
mergeImplClasses, mergeStrategyId);
+ if (mergerClass.isEmpty()) {
+ throw new HoodieValidationException("No valid flink merger
implementation set for `"
+ + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
+ }
+ return mergerClass;
+ }
+ }
+
+ @Override
+ public Object getValue(RowData record, Schema schema, String fieldName) {
+ Option<RowData.FieldGetter> fieldGetterOpt =
HoodieRowDataUtil.getFieldGetter(schema, fieldName);
+ return fieldGetterOpt.isEmpty() ? null :
fieldGetterOpt.get().getFieldOrNull(record);
+ }
+
+ @Override
+ public String getRecordKey(RowData record, Schema schema) {
+ return Objects.toString(getValue(record, schema,
RECORD_KEY_METADATA_FIELD));
+ }
+
+ @Override
+ public Map<String, Object> generateMetadataForRecord(RowData record, Schema
schema, Option<String> orderingFieldName) {
+ Map<String, Object> metadata = super.generateMetadataForRecord(record,
schema, orderingFieldName);
+ if (orderingFieldName.isEmpty() ||
metadata.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ return metadata;
+ }
+ Comparable orderingValue = getOrderingValue(Option.of(record), metadata,
schema, orderingFieldName);
+ metadata.put(INTERNAL_META_ORDERING_FIELD, orderingValue);
+ return metadata;
+ }
+
+ @Override
+ public HoodieRecord<RowData> constructHoodieRecord(Option<RowData>
recordOption, Map<String, Object> metadataMap) {
+ HoodieKey hoodieKey = new HoodieKey(
+ (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
+ (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
+ // delete record
+ if (recordOption.isEmpty()) {
+ return new HoodieEmptyRecord<>(hoodieKey,
HoodieRecord.HoodieRecordType.FLINK);
+ }
+ RowData rowData = recordOption.get();
+ HoodieOperation operation =
HoodieOperation.fromValue(rowData.getRowKind().toByteValue());
+
+ Comparable orderingValue;
+ if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ orderingValue = (Comparable)
metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+ } else {
+ throw new HoodieException("There should be ordering value in
metadataMap.");
+ }
+ return new HoodieFlinkRecord(hoodieKey, operation, orderingValue, rowData);
+ }
+
+ @Override
+ public Comparable getOrderingValue(Option<RowData> recordOption, Map<String,
Object> metadataMap, Schema schema, Option<String> orderingFieldName) {
+ if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+ }
+ if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
+ return DEFAULT_ORDERING_VALUE;
+ }
+ Object value = getValue(recordOption.get(), schema,
orderingFieldName.get());
+ // currently the ordering value stored in DeleteRecord is converted to
Avro value because Flink reader currently uses AVRO payload to merge.
+ // So here we align the data format with reader until RowData reader is
supported, HUDI-9146.
+ UnaryOperator<Object> fieldConverter =
HoodieRowDataUtil.getFieldConverter(schema, orderingFieldName.get(), flinkConf);
+ Comparable finalOrderingVal = value != null ? (Comparable)
fieldConverter.apply(value) : DEFAULT_ORDERING_VALUE;
+ metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
+ return finalOrderingVal;
+ }
+
+ @Override
+ public RowData seal(RowData rowData) {
+ if (rowDataSerializer == null) {
+ RowType requiredRowType = (RowType)
AvroSchemaConverter.convertToDataType(getSchemaHandler().getRequiredSchema()).getLogicalType();
+ rowDataSerializer = new RowDataSerializer(requiredRowType);
Review Comment:
The serializer here is not created at record level, it's a member field for
the `FlinkRowDataReaderContext`
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java:
##########
@@ -60,4 +79,19 @@ public void preCompact(
public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
// No OP
}
+
+ @Override
+ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
+ HoodieTableMetaClient metaClient,
+ HoodieWriteConfig writeConfig,
+ CompactionOperation operation,
+ String instantTime,
+ Option<EngineBroadcastManager>
broadcastManagerOpt) throws IOException {
+ Configuration conf =
metaClient.getStorage().getConf().unwrapAs(Configuration.class);
+ FlinkRowDataReaderContext readerContext = new FlinkRowDataReaderContext(
Review Comment:
Besides the usage you mentioned, flink conf stored in
`FlinkRowDataReaderContext` is also used to:
* generate partition specs for `FlinkParquetReader`;
* get config `read.utc-timezone` to create field converter (Flink value ->
Avro value) in `getOrderingValue`;
Any recommended cleaner way to achieve these?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -309,7 +309,7 @@ public Map<String, Object> generateMetadataForRecord(
* @param schema The Avro schema of the record.
* @return A mapping containing the metadata.
*/
- public Map<String, Object> generateMetadataForRecord(T record, Schema
schema) {
+ public Map<String, Object> generateMetadataForRecord(T record, Schema
schema, Option<String> orderingFieldName) {
Review Comment:
`orderingFieldName` is added to generate ordering value in
`FlinkRowDataReaderContext`.
`generateMetadataForRecord` only generates recordKey by default, and the
generated metadata map will be used to construct HoodieFlinkRecord in
`constructHoodieRecord(Option<RowData> recordOption, Map<String, Object>
metadataMap)`, where ordering value is necessary.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/FlinkRowDataReaderContext.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.io.storage.row.RowDataFileReader;
+import org.apache.hudi.io.storage.row.RowDataFileReaderFactories;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.expression.ExpressionPredicates.Predicate;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieRowDataUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Implementation of {@link HoodieReaderContext} to read {@link RowData}s from
base files or
+ * log files with Flink parquet reader.
+ */
+public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
+ private final List<Predicate> predicates;
+ private final InternalSchemaManager internalSchemaManager;
+ private RowDataSerializer rowDataSerializer;
+ private final Configuration flinkConf;
+
+ public FlinkRowDataReaderContext(
+ Configuration conf,
+ InternalSchemaManager internalSchemaManager,
+ List<Predicate> predicates) {
+ this.flinkConf = conf;
+ this.internalSchemaManager = internalSchemaManager;
+ this.predicates = predicates;
+ }
+
+ @Override
+ public ClosableIterator<RowData> getFileRecordIterator(
+ StoragePath filePath,
+ long start,
+ long length,
+ Schema dataSchema,
+ Schema requiredSchema,
+ HoodieStorage storage) throws IOException {
+ RowDataFileReaderFactories.Factory readerFactory =
RowDataFileReaderFactories.getFactory(HoodieFileFormat.PARQUET);
+ RowDataFileReader fileReader =
readerFactory.createFileReader(internalSchemaManager, flinkConf);
+
+ List<String> fieldNames =
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<DataType> fieldTypes = dataSchema.getFields().stream().map(
+ f ->
AvroSchemaConverter.convertToDataType(f.schema())).collect(Collectors.toList());
+ int[] selectedFields =
requiredSchema.getFields().stream().map(Schema.Field::name)
+ .map(fieldNames::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+
+ return fileReader.getRowDataIterator(
+ fieldNames,
+ fieldTypes,
+ selectedFields,
+ predicates,
+ filePath,
+ start,
+ length);
+ }
+
+ @Override
+ public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
+ switch (mergeMode) {
+ case EVENT_TIME_ORDERING:
+ return Option.of(new EventTimeFlinkRecordMerger());
+ case COMMIT_TIME_ORDERING:
+ return Option.of(new CommitTimeFlinkRecordMerger());
+ default:
+ Option<HoodieRecordMerger> mergerClass =
+ HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK,
mergeImplClasses, mergeStrategyId);
+ if (mergerClass.isEmpty()) {
+ throw new HoodieValidationException("No valid flink merger
implementation set for `"
+ + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
+ }
+ return mergerClass;
+ }
+ }
+
+ @Override
+ public Object getValue(RowData record, Schema schema, String fieldName) {
+ Option<RowData.FieldGetter> fieldGetterOpt =
HoodieRowDataUtil.getFieldGetter(schema, fieldName);
+ return fieldGetterOpt.isEmpty() ? null :
fieldGetterOpt.get().getFieldOrNull(record);
+ }
+
+ @Override
+ public String getRecordKey(RowData record, Schema schema) {
+ return Objects.toString(getValue(record, schema,
RECORD_KEY_METADATA_FIELD));
+ }
+
+ @Override
+ public Map<String, Object> generateMetadataForRecord(RowData record, Schema
schema, Option<String> orderingFieldName) {
+ Map<String, Object> metadata = super.generateMetadataForRecord(record,
schema, orderingFieldName);
+ if (orderingFieldName.isEmpty() ||
metadata.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ return metadata;
+ }
+ Comparable orderingValue = getOrderingValue(Option.of(record), metadata,
schema, orderingFieldName);
+ metadata.put(INTERNAL_META_ORDERING_FIELD, orderingValue);
+ return metadata;
+ }
+
+ @Override
+ public HoodieRecord<RowData> constructHoodieRecord(Option<RowData>
recordOption, Map<String, Object> metadataMap) {
+ HoodieKey hoodieKey = new HoodieKey(
+ (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
+ (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
+ // delete record
+ if (recordOption.isEmpty()) {
+ return new HoodieEmptyRecord<>(hoodieKey,
HoodieRecord.HoodieRecordType.FLINK);
Review Comment:
Nice catch, will update.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java:
##########
@@ -112,15 +113,24 @@ public HoodieRecord joinWith(HoodieRecord other, Schema
targetSchema) {
@Override
public HoodieRecord prependMetaFields(Schema recordSchema, Schema
targetSchema, MetadataValues metadataValues, Properties props) {
- int metaFieldSize = targetSchema.getFields().size() -
recordSchema.getFields().size();
- GenericRowData metaRow = new GenericRowData(metaFieldSize);
+ boolean withMetaFields = recordSchema.getField(RECORD_KEY_METADATA_FIELD)
!= null;
+ boolean withOperationField =
targetSchema.getField(OPERATION_METADATA_FIELD) != null;
+ int metaFieldSize = HOODIE_META_COLUMNS.size();
+ String[] metaFields = new String[metaFieldSize];
+ if (withMetaFields) {
+ for (int i = 0; i < metaFieldSize; i++) {
+ metaFields[i] = data.getString(i).toString();
+ }
+ }
+
+ AbstractHoodieRowData rowWithMetaFields =
HoodieRowDataCreation.create(metaFields, data, withOperationField,
withMetaFields);
Review Comment:
We can do that for memory efficiency.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/FlinkRowDataReaderContext.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.io.storage.row.RowDataFileReader;
+import org.apache.hudi.io.storage.row.RowDataFileReaderFactories;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.expression.ExpressionPredicates.Predicate;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieRowDataUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Implementation of {@link HoodieReaderContext} to read {@link RowData}s from
base files or
+ * log files with Flink parquet reader.
+ */
+public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
+ private final List<Predicate> predicates;
+ private final InternalSchemaManager internalSchemaManager;
+ private RowDataSerializer rowDataSerializer;
+ private final Configuration flinkConf;
+
+ public FlinkRowDataReaderContext(
+ Configuration conf,
+ InternalSchemaManager internalSchemaManager,
+ List<Predicate> predicates) {
+ this.flinkConf = conf;
+ this.internalSchemaManager = internalSchemaManager;
+ this.predicates = predicates;
+ }
+
+ @Override
+ public ClosableIterator<RowData> getFileRecordIterator(
+ StoragePath filePath,
+ long start,
+ long length,
+ Schema dataSchema,
+ Schema requiredSchema,
+ HoodieStorage storage) throws IOException {
+ RowDataFileReaderFactories.Factory readerFactory =
RowDataFileReaderFactories.getFactory(HoodieFileFormat.PARQUET);
+ RowDataFileReader fileReader =
readerFactory.createFileReader(internalSchemaManager, flinkConf);
+
+ List<String> fieldNames =
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<DataType> fieldTypes = dataSchema.getFields().stream().map(
+ f ->
AvroSchemaConverter.convertToDataType(f.schema())).collect(Collectors.toList());
+ int[] selectedFields =
requiredSchema.getFields().stream().map(Schema.Field::name)
+ .map(fieldNames::indexOf)
+ .mapToInt(i -> i)
+ .toArray();
+
+ return fileReader.getRowDataIterator(
+ fieldNames,
+ fieldTypes,
+ selectedFields,
+ predicates,
+ filePath,
+ start,
+ length);
+ }
+
+ @Override
+ public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
+ switch (mergeMode) {
+ case EVENT_TIME_ORDERING:
+ return Option.of(new EventTimeFlinkRecordMerger());
+ case COMMIT_TIME_ORDERING:
+ return Option.of(new CommitTimeFlinkRecordMerger());
+ default:
+ Option<HoodieRecordMerger> mergerClass =
+ HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK,
mergeImplClasses, mergeStrategyId);
+ if (mergerClass.isEmpty()) {
+ throw new HoodieValidationException("No valid flink merger
implementation set for `"
+ + RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY + "`");
+ }
+ return mergerClass;
+ }
+ }
+
+ @Override
+ public Object getValue(RowData record, Schema schema, String fieldName) {
+ Option<RowData.FieldGetter> fieldGetterOpt =
HoodieRowDataUtil.getFieldGetter(schema, fieldName);
+ return fieldGetterOpt.isEmpty() ? null :
fieldGetterOpt.get().getFieldOrNull(record);
+ }
+
+ @Override
+ public String getRecordKey(RowData record, Schema schema) {
+ return Objects.toString(getValue(record, schema,
RECORD_KEY_METADATA_FIELD));
Review Comment:
Here just follows the current FG reader based compaction in
`HoodieCompactor`:
https://github.com/apache/hudi/blob/45dedd819e56e521148bde51a3dfa4e472ea70cd/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java#L152
i.e., one of the prerequisites for FG reader based compaction is
`populateMetaFields` is enabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]