This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 57c3f632a203 refactor: consolidate common utility classes for Flink
CDC read (#18436)
57c3f632a203 is described below
commit 57c3f632a203402b3b6d64d72389f2bf5d94f84e
Author: Peter Huang <[email protected]>
AuthorDate: Thu Apr 2 18:53:52 2026 -0700
refactor: consolidate common utility classes for Flink CDC read (#18436)
* refactor: consolidate common utility classes for Flink CDC read
* resolve shuo's comments
---
.../function/HoodieCdcSplitReaderFunction.java | 785 +-------------------
.../hudi/table/format/cdc/CdcImageManager.java | 197 ++++++
.../hudi/table/format/cdc/CdcInputFormat.java | 788 +--------------------
.../apache/hudi/table/format/cdc/CdcIterators.java | 667 +++++++++++++++++
.../function/TestHoodieCdcSplitReaderFunction.java | 10 +-
5 files changed, 938 insertions(+), 1509 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
index 4d57ce18f3ca..0b0d4f6191bc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
@@ -18,97 +18,58 @@
package org.apache.hudi.source.reader.function;
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.reader.BatchRecords;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieCdcSourceSplit;
import org.apache.hudi.source.split.HoodieSourceSplit;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcImageManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.cdc.CdcIterators;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
-import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.RowDataProjection;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
/**
* CDC reader function for source V2. Reads CDC splits ({@link
HoodieCdcSourceSplit}) and
- * emits change-log {@link RowData} records tagged with the appropriate {@link
RowKind}.
+ * emits change-log {@link RowData} records tagged with the appropriate {@link
org.apache.flink.types.RowKind}.
*
* <p>The implementation mirrors the logic in {@link CdcInputFormat}, adapted
for the
* {@link SplitReaderFunction} contract.
@@ -158,10 +119,9 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
HoodieCDCSupplementalLoggingMode mode =
OptionsResolver.getCDCSupplementalLoggingMode(conf);
- HoodieTableMetaClient client = getMetaClient();
- HoodieWriteConfig wConfig = getWriteConfig();
- ImageManager imageManager = new ImageManager(tableState.getRowType(),
wConfig, this::getFileSliceIterator);
+ CdcImageManager imageManager = new CdcImageManager(
+ tableState.getRowType(), getWriteConfig(), this::getFileSliceIterator);
Function<HoodieCDCFileSplit, ClosableIterator<RowData>> recordIteratorFunc
=
cdcFileSplit -> createRecordIteratorSafe(
@@ -169,10 +129,9 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
cdcSplit.getMaxCompactionMemoryInBytes(),
cdcFileSplit,
mode,
- imageManager,
- client);
+ imageManager);
- currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(),
imageManager, recordIteratorFunc);
+ currentIterator = new
CdcIterators.CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager,
recordIteratorFunc);
BatchRecords<RowData> records = BatchRecords.forRecords(
split.splitId(), currentIterator, split.getFileOffset(),
split.getConsumed());
records.seek(split.getConsumed());
@@ -212,10 +171,9 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager,
- HoodieTableMetaClient client) {
+ CdcImageManager imageManager) {
try {
- return createRecordIterator(tablePath, maxCompactionMemoryInBytes,
fileSplit, mode, imageManager, client);
+ return createRecordIterator(tablePath, maxCompactionMemoryInBytes,
fileSplit, mode, imageManager);
} catch (IOException e) {
throw new HoodieException("Failed to create CDC record iterator for
split: " + fileSplit, e);
}
@@ -226,8 +184,7 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager,
- HoodieTableMetaClient client) throws IOException {
+ CdcImageManager imageManager) throws IOException {
final HoodieSchema tableSchema =
HoodieSchema.parse(tableState.getTableSchema());
final HoodieSchema requiredSchema =
HoodieSchema.parse(tableState.getRequiredSchema());
@@ -236,29 +193,33 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton for
BASE_FILE_INSERT");
String path = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
- return new AddBaseFileIterator(getBaseFileIterator(path));
+ return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
}
case BASE_FILE_DELETE: {
ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
"Before file slice should exist for BASE_FILE_DELETE");
- FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
- MergeOnReadInputSplit inputSplit =
CdcInputFormat.fileSlice2Split(tablePath, fileSlice,
maxCompactionMemoryInBytes);
- return new RemoveBaseFileIterator(tableState.getRequiredRowType(),
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+ MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
+ return new CdcIterators.RemoveBaseFileIterator(
+ tableState.getRequiredRowType(),
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
}
case AS_IS: {
HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tableSchema);
HoodieSchema cdcSchema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case DATA_BEFORE_AFTER:
- return new BeforeAfterImageIterator(
- getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(), cdcSchema, fileSplit);
+ return new CdcIterators.BeforeAfterImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(), cdcSchema, fileSplit);
case DATA_BEFORE:
- return new BeforeImageIterator(
- conf, getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(),
+ return new CdcIterators.BeforeImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(),
tableState.getRequiredPositions(),
maxCompactionMemoryInBytes, cdcSchema, fileSplit,
imageManager);
case OP_KEY_ONLY:
- return new RecordKeyImageIterator(
- conf, getHadoopConf(), tablePath, tableSchema, requiredSchema,
tableState.getRequiredRowType(),
+ return new CdcIterators.RecordKeyImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(),
tableState.getRequiredPositions(),
maxCompactionMemoryInBytes, cdcSchema, fileSplit,
imageManager);
default:
throw new AssertionError("Unexpected CDC supplemental logging
mode: " + mode);
@@ -268,16 +229,17 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton for LOG_FILE");
String logFilePath = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
- MergeOnReadInputSplit split =
CdcInputFormat.singleLogFile2Split(tablePath, logFilePath,
maxCompactionMemoryInBytes);
+ MergeOnReadInputSplit split =
CdcIterators.singleLogFile2Split(tablePath, logFilePath,
maxCompactionMemoryInBytes);
ClosableIterator<HoodieRecord<RowData>> recordIterator =
getFileSliceHoodieRecordIterator(split);
- return new DataLogFileIterator(
- maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema,
tableState.getRequiredRowType(), tableState.getRequiredPositions(),
- recordIterator, client, getWriteConfig());
+ return new CdcIterators.DataLogFileIterator(
+ maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema,
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ recordIterator, getMetaClient(), getWriteConfig());
}
case REPLACE_COMMIT: {
- return new ReplaceCommitIterator(
- conf, tablePath, tableState.getRequiredRowType(),
tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
- fileSplit, this::getFileSliceIterator);
+ return new CdcIterators.ReplaceCommitIterator(
+ tablePath, tableState.getRequiredRowType(),
tableState.getRequiredPositions(),
+ maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
}
default:
throw new AssertionError("Unexpected CDC file split infer case: " +
fileSplit.getCdcInferCase());
@@ -355,689 +317,10 @@ public class HoodieCdcSplitReaderFunction extends
AbstractSplitReaderFunction {
.orElse(Collections.emptyList()));
}
- private static int[] computeRequiredPositions(RowType rowType, RowType
requiredRowType) {
- List<String> allNames = rowType.getFieldNames();
- return requiredRowType.getFieldNames().stream()
- .map(allNames::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
-
private HoodieTableMetaClient getMetaClient() {
if (metaClient == null) {
metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
}
return metaClient;
}
-
- // -------------------------------------------------------------------------
- // Inner iterators (adapted from CdcInputFormat inner classes)
- // -------------------------------------------------------------------------
-
- /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating
record reading to a factory. */
- private static class CdcFileSplitsIterator implements
ClosableIterator<RowData> {
- private ImageManager imageManager;
- private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
- private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc;
- private ClosableIterator<RowData> recordIterator;
-
- CdcFileSplitsIterator(
- HoodieCDCFileSplit[] changes,
- ImageManager imageManager,
- Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc) {
- this.fileSplitIterator = Arrays.asList(changes).iterator();
- this.imageManager = imageManager;
- this.recordIteratorFunc = recordIteratorFunc;
- }
-
- @Override
- public boolean hasNext() {
- if (recordIterator != null) {
- if (recordIterator.hasNext()) {
- return true;
- } else {
- recordIterator.close();
- recordIterator = null;
- }
- }
- if (fileSplitIterator.hasNext()) {
- recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
- return recordIterator.hasNext();
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return recordIterator.next();
- }
-
- @Override
- public void close() {
- if (recordIterator != null) {
- recordIterator.close();
- }
- if (imageManager != null) {
- imageManager.close();
- imageManager = null;
- }
- }
- }
-
- /** Wraps a base-file parquet iterator and marks every record as {@link
RowKind#INSERT}. */
- private static class AddBaseFileIterator implements
ClosableIterator<RowData> {
- private ClosableIterator<RowData> nested;
- private RowData currentRecord;
-
- AddBaseFileIterator(ClosableIterator<RowData> nested) {
- this.nested = nested;
- }
-
- @Override
- public boolean hasNext() {
- if (nested.hasNext()) {
- currentRecord = nested.next();
- currentRecord.setRowKind(RowKind.INSERT);
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (nested != null) {
- nested.close();
- nested = null;
- }
- }
- }
-
- /** Wraps a file-slice iterator and marks every record as {@link
RowKind#DELETE}, with projection. */
- private static class RemoveBaseFileIterator implements
ClosableIterator<RowData> {
- private ClosableIterator<RowData> nested;
- private final RowDataProjection projection;
-
- RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions,
ClosableIterator<RowData> iterator) {
- this.nested = iterator;
- this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
- }
-
- @Override
- public boolean hasNext() {
- return nested.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = nested.next();
- row.setRowKind(RowKind.DELETE);
- return projection.project(row);
- }
-
- @Override
- public void close() {
- if (nested != null) {
- nested.close();
- nested = null;
- }
- }
- }
-
- /**
- * Handles the {@code LOG_FILE} CDC inference case: compares records from
the log file
- * against before-image snapshots to emit
INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
- */
- private static class DataLogFileIterator implements
ClosableIterator<RowData> {
- private final HoodieSchema tableSchema;
- private final long maxCompactionMemoryInBytes;
- private final ImageManager imageManager;
- private final RowDataProjection projection;
- private final BufferedRecordMerger recordMerger;
- private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
- private final DeleteContext deleteContext;
- private final HoodieReaderContext<RowData> readerContext;
- private final String[] orderingFields;
- private final TypedProperties props;
-
- private ExternalSpillableMap<String, byte[]> beforeImages;
- private RowData currentImage;
- private RowData sideImage;
-
- DataLogFileIterator(
- long maxCompactionMemoryInBytes,
- ImageManager imageManager,
- HoodieCDCFileSplit cdcFileSplit,
- HoodieSchema tableSchema,
- RowType requiredRowType,
- int[] requiredPositions,
- ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
- HoodieTableMetaClient metaClient,
- HoodieWriteConfig writeConfig) throws IOException {
- this.tableSchema = tableSchema;
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.imageManager = imageManager;
- this.projection =
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
- ? null : RowDataProjection.instance(requiredRowType,
requiredPositions);
- this.props = writeConfig.getProps();
- this.readerContext = new
FlinkReaderContextFactory(metaClient).getContext();
- readerContext.initRecordMerger(props);
- this.orderingFields = ConfigUtils.getOrderingFields(props);
- this.recordMerger = BufferedRecordMergerFactory.create(
- readerContext,
- readerContext.getMergeMode(),
- false,
- Option.of(writeConfig.getRecordMerger()),
- tableSchema,
-
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(),
writeConfig.getPayloadClass())),
- props,
- metaClient.getTableConfig().getPartialUpdateMode());
- this.logRecordIterator = logRecordIterator;
- this.deleteContext = new DeleteContext(props,
tableSchema).withReaderSchema(tableSchema);
- initImages(cdcFileSplit, writeConfig);
- }
-
- private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig
writeConfig) throws IOException {
- if (fileSplit.getBeforeFileSlice().isPresent() &&
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- } else {
- this.beforeImages = FormatUtils.spillableMap(writeConfig,
maxCompactionMemoryInBytes, getClass().getSimpleName());
- }
- }
-
- @Override
- public boolean hasNext() {
- if (sideImage != null) {
- currentImage = sideImage;
- sideImage = null;
- return true;
- }
- while (logRecordIterator.hasNext()) {
- HoodieRecord<RowData> record = logRecordIterator.next();
- RowData existed =
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
- if (isDelete(record)) {
- if (existed != null) {
- existed.setRowKind(RowKind.DELETE);
- currentImage = existed;
- return true;
- }
- } else {
- if (existed == null) {
- RowData newRow = record.getData();
- newRow.setRowKind(RowKind.INSERT);
- currentImage = newRow;
- return true;
- } else {
- HoodieOperation operation =
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
- HoodieRecord<RowData> historyRecord = new
HoodieFlinkRecord(record.getKey(), operation, existed);
- HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord,
record).get();
- if (merged.getData() != existed) {
- existed.setRowKind(RowKind.UPDATE_BEFORE);
- currentImage = existed;
- RowData mergedRow = merged.getData();
- mergedRow.setRowKind(RowKind.UPDATE_AFTER);
- imageManager.updateImageRecord(record.getRecordKey(),
beforeImages, mergedRow);
- sideImage = mergedRow;
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return projection != null ? projection.project(currentImage) :
currentImage;
- }
-
- @Override
- public void close() {
- logRecordIterator.close();
- imageManager.close();
- }
-
- @SuppressWarnings("unchecked")
- private Option<HoodieRecord<RowData>> mergeRowWithLog(
- HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
- try {
- BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
- historyRecord, tableSchema, readerContext.getRecordContext(),
props, orderingFields, deleteContext);
- BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
- newRecord, tableSchema, readerContext.getRecordContext(), props,
orderingFields, deleteContext);
- BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf,
newBuf);
- return Option.ofNullable(readerContext.getRecordContext()
- .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
- } catch (IOException e) {
- throw new HoodieIOException("Merge base and delta payloads exception",
e);
- }
- }
-
- private boolean isDelete(HoodieRecord<RowData> record) {
- return record.isDelete(deleteContext, CollectionUtils.emptyProps());
- }
- }
-
- /**
- * Base iterator for CDC log files stored with supplemental logging (AS_IS
inference case).
- * Reads {@link HoodieCDCLogRecordIterator} and resolves before/after images
using
- * subclass-specific logic.
- */
- private abstract static class BaseImageIterator implements
ClosableIterator<RowData> {
- private final HoodieSchema requiredSchema;
- private final int[] requiredPos;
- private final GenericRecordBuilder recordBuilder;
- private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
- private HoodieCDCLogRecordIterator cdcItr;
-
- private GenericRecord cdcRecord;
- private RowData sideImage;
- private RowData currentImage;
-
- BaseImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- this.requiredSchema = requiredSchema;
- this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
- this.recordBuilder = new
GenericRecordBuilder(requiredSchema.getAvroSchema());
- this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
-
- StoragePath hadoopTablePath = new StoragePath(tablePath);
- HoodieStorage storage = HoodieStorageUtils.getStorage(
- tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
- HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
- .map(cdcFile -> {
- try {
- return new HoodieLogFile(storage.getPathInfo(new
StoragePath(hadoopTablePath, cdcFile)));
- } catch (IOException e) {
- throw new HoodieIOException("Failed to get file status for CDC
log: " + cdcFile, e);
- }
- })
- .toArray(HoodieLogFile[]::new);
- this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema);
- }
-
- private static int[] computeRequiredPos(HoodieSchema tableSchema,
HoodieSchema requiredSchema) {
- HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tableSchema);
- List<String> fields = dataSchema.getFields().stream()
- .map(HoodieSchemaField::name)
- .collect(Collectors.toList());
- return requiredSchema.getFields().stream()
- .map(f -> fields.indexOf(f.name()))
- .mapToInt(i -> i)
- .toArray();
- }
-
- @Override
- public boolean hasNext() {
- if (sideImage != null) {
- currentImage = sideImage;
- sideImage = null;
- return true;
- } else if (cdcItr.hasNext()) {
- cdcRecord = (GenericRecord) cdcItr.next();
- String op = String.valueOf(cdcRecord.get(0));
- resolveImage(op);
- return true;
- }
- return false;
- }
-
- protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
-
- protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
-
- @Override
- public RowData next() {
- return currentImage;
- }
-
- @Override
- public void close() {
- if (cdcItr != null) {
- cdcItr.close();
- cdcItr = null;
- }
- }
-
- private void resolveImage(String op) {
- switch (op) {
- case "i":
- currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
- break;
- case "u":
- currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
- sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
- break;
- case "d":
- currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
- break;
- default:
- throw new AssertionError("Unexpected CDC operation: " + op);
- }
- }
-
- protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord, requiredSchema, requiredPos, recordBuilder);
- RowData resolved = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
- resolved.setRowKind(rowKind);
- return resolved;
- }
- }
-
- /** Reads CDC log files that contain both before and after images ({@code
DATA_BEFORE_AFTER} mode). */
- private static class BeforeAfterImageIterator extends BaseImageIterator {
- BeforeAfterImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- /**
- * Reads CDC log files containing op + key + before_image ({@code
DATA_BEFORE} mode).
- * The after-image is loaded from the after file-slice via the {@link
ImageManager}.
- */
- private static class BeforeImageIterator extends BaseImageIterator {
- protected ExternalSpillableMap<String, byte[]> afterImages;
- protected final long maxCompactionMemoryInBytes;
- protected final RowDataProjection projection;
- protected final ImageManager imageManager;
-
- BeforeImageIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- long maxCompactionMemoryInBytes,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.projection = RowDataProjection.instance(requiredRowType,
- computePositions(tableSchema, requiredRowType));
- this.imageManager = imageManager;
- initImages(fileSplit);
- }
-
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
- ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
- "Current file slice does not exist for instant: " +
fileSplit.getInstant());
- this.afterImages = imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, afterImages,
rowKind);
- row.setRowKind(rowKind);
- return projection.project(row);
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
-
- private static int[] computePositions(HoodieSchema tableSchema, RowType
requiredRowType) {
- List<String> allFields = tableSchema.getFields().stream()
- .map(HoodieSchemaField::name)
- .collect(Collectors.toList());
- return requiredRowType.getFieldNames().stream()
- .map(allFields::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
- }
-
- /**
- * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
- * Both before and after images are loaded from file-slice snapshots via
{@link ImageManager}.
- */
- private static class RecordKeyImageIterator extends BeforeImageIterator {
- protected ExternalSpillableMap<String, byte[]> beforeImages;
-
- RecordKeyImageIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- long maxCompactionMemoryInBytes,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(flinkConf, hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType,
- maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
- }
-
- @Override
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
- super.initImages(fileSplit);
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " +
fileSplit.getInstant());
- this.beforeImages = imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, beforeImages,
rowKind);
- row.setRowKind(rowKind);
- return projection.project(row);
- }
- }
-
- /** Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records
from before-slice as DELETE. */
- private static class ReplaceCommitIterator implements
ClosableIterator<RowData> {
- private final ClosableIterator<RowData> itr;
- private final RowDataProjection projection;
-
- ReplaceCommitIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- String tablePath,
- RowType requiredRowType,
- int[] requiredPositions,
- long maxCompactionMemoryInBytes,
- HoodieCDCFileSplit fileSplit,
- Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " +
fileSplit.getInstant());
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
- this.itr = splitIteratorFunc.apply(inputSplit);
- this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
- }
-
- @Override
- public boolean hasNext() {
- return itr.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = itr.next();
- row.setRowKind(RowKind.DELETE);
- return projection.project(row);
- }
-
- @Override
- public void close() {
- itr.close();
- }
- }
-
- // -------------------------------------------------------------------------
- // ImageManager - caches full-schema row images keyed by record key
- // -------------------------------------------------------------------------
-
- /**
- * Manages serialized before/after image snapshots for a file group, cached
by instant time.
- * At most two versions (before and after) are kept in memory; older entries
are spilled to disk.
- */
- private static class ImageManager implements AutoCloseable {
- private final HoodieWriteConfig writeConfig;
- private final RowDataSerializer serializer;
- private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc;
- private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
-
- ImageManager(
- RowType rowType,
- HoodieWriteConfig writeConfig,
- Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
- this.serializer = new RowDataSerializer(rowType);
- this.writeConfig = writeConfig;
- this.splitIteratorFunc = splitIteratorFunc;
- this.cache = new TreeMap<>();
- }
-
- ExternalSpillableMap<String, byte[]> getOrLoadImages(
- long maxCompactionMemoryInBytes, FileSlice fileSlice) throws
IOException {
- final String instant = fileSlice.getBaseInstantTime();
- if (cache.containsKey(instant)) {
- return cache.get(instant);
- }
- if (cache.size() > 1) {
- String oldest = cache.keySet().iterator().next();
- cache.remove(oldest).close();
- }
- ExternalSpillableMap<String, byte[]> images =
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
- cache.put(instant, images);
- return images;
- }
-
- private ExternalSpillableMap<String, byte[]> loadImageRecords(
- long maxCompactionMemoryInBytes, FileSlice fileSlice) throws
IOException {
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
- ExternalSpillableMap<String, byte[]> imageRecordsMap =
- FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes,
getClass().getSimpleName());
- try (ClosableIterator<RowData> itr =
splitIteratorFunc.apply(inputSplit)) {
- while (itr.hasNext()) {
- RowData row = itr.next();
- String recordKey =
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializer.serialize(row, new BytesArrayOutputView(baos));
- imageRecordsMap.put(recordKey, baos.toByteArray());
- }
- }
- return imageRecordsMap;
- }
-
- RowData getImageRecord(
- String recordKey, ExternalSpillableMap<String, byte[]> imageCache,
RowKind rowKind) {
- byte[] bytes = imageCache.get(recordKey);
- ValidationUtils.checkState(bytes != null,
- "Key " + recordKey + " does not exist in current file group image");
- try {
- RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
- row.setRowKind(rowKind);
- return row;
- } catch (IOException e) {
- throw new HoodieException("Failed to deserialize image record for key:
" + recordKey, e);
- }
- }
-
- void updateImageRecord(
- String recordKey, ExternalSpillableMap<String, byte[]> imageCache,
RowData row) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- try {
- serializer.serialize(row, new BytesArrayOutputView(baos));
- } catch (IOException e) {
- throw new HoodieException("Failed to serialize image record for key: "
+ recordKey, e);
- }
- imageCache.put(recordKey, baos.toByteArray());
- }
-
- RowData removeImageRecord(
- String recordKey, ExternalSpillableMap<String, byte[]> imageCache) {
- byte[] bytes = imageCache.remove(recordKey);
- if (bytes == null) {
- return null;
- }
- try {
- return serializer.deserialize(new BytesArrayInputView(bytes));
- } catch (IOException e) {
- throw new HoodieException("Failed to deserialize image record for key:
" + recordKey, e);
- }
- }
-
- @Override
- public void close() {
- cache.values().forEach(ExternalSpillableMap::close);
- cache.clear();
- }
- }
-
- // -------------------------------------------------------------------------
- // I/O view adapters for RowDataSerializer
- // -------------------------------------------------------------------------
-
- private static final class BytesArrayInputView extends DataInputStream
- implements org.apache.flink.core.memory.DataInputView {
- BytesArrayInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
- }
-
- private static final class BytesArrayOutputView extends DataOutputStream
- implements org.apache.flink.core.memory.DataOutputView {
- BytesArrayOutputView(ByteArrayOutputStream baos) {
- super(baos);
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; i++) {
- write(0);
- }
- }
-
- @Override
- public void write(org.apache.flink.core.memory.DataInputView source, int
numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- write(buffer);
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
new file mode 100644
index 000000000000..cad323b7fb81
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import lombok.Getter;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+
+/**
+ * Manages serialized before/after image snapshots for a CDC file group,
cached by instant time.
+ *
+ * <p>At most two versions (before and after) are kept in memory at once;
older entries are
+ * evicted and spilled to disk via {@link ExternalSpillableMap}.
+ *
+ * <p>Also owns the I/O-view adapters ({@link BytesArrayInputView} / {@link
BytesArrayOutputView})
+ * used for serialising {@link RowData} records into byte arrays.
+ */
+public class CdcImageManager implements AutoCloseable {
+ @Getter
+ private final HoodieWriteConfig writeConfig;
+ private final RowDataSerializer serializer;
+ private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc;
+ private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+ public CdcImageManager(
+ RowType rowType,
+ HoodieWriteConfig writeConfig,
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
+ this.serializer = new RowDataSerializer(rowType);
+ this.writeConfig = writeConfig;
+ this.splitIteratorFunc = splitIteratorFunc;
+ this.cache = new TreeMap<>();
+ }
+
+ public ExternalSpillableMap<String, byte[]> getOrLoadImages(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ final String instant = fileSlice.getBaseInstantTime();
+ if (cache.containsKey(instant)) {
+ return cache.get(instant);
+ }
+ // evict the earliest version when more than two are cached (keep before &
after)
+ if (cache.size() > 1) {
+ String oldest = cache.keySet().iterator().next();
+ cache.remove(oldest).close();
+ }
+ ExternalSpillableMap<String, byte[]> images =
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+ cache.put(instant, images);
+ return images;
+ }
+
+ private ExternalSpillableMap<String, byte[]> loadImageRecords(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+ writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
+ ExternalSpillableMap<String, byte[]> imageRecordsMap =
+ FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes,
getClass().getSimpleName());
+ try (ClosableIterator<RowData> itr = splitIteratorFunc.apply(inputSplit)) {
+ while (itr.hasNext()) {
+ RowData row = itr.next();
+ String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ imageRecordsMap.put(recordKey, baos.toByteArray());
+ }
+ }
+ return imageRecordsMap;
+ }
+
+ public RowData getImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> imageCache,
+ RowKind rowKind) {
+ byte[] bytes = imageCache.get(recordKey);
+ ValidationUtils.checkState(bytes != null,
+ "Key " + recordKey + " does not exist in current file group image");
+ try {
+ RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+ row.setRowKind(rowKind);
+ return row;
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key: "
+ recordKey, e);
+ }
+ }
+
+ public void updateImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> imageCache,
+ RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ try {
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to serialize image record for key: " +
recordKey, e);
+ }
+ imageCache.put(recordKey, baos.toByteArray());
+ }
+
+ public RowData removeImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> imageCache) {
+ byte[] bytes = imageCache.remove(recordKey);
+ if (bytes == null) {
+ return null;
+ }
+ try {
+ return serializer.deserialize(new BytesArrayInputView(bytes));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key: "
+ recordKey, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ cache.values().forEach(ExternalSpillableMap::close);
+ cache.clear();
+ }
+
+ // -------------------------------------------------------------------------
+ // I/O view adapters for RowDataSerializer
+ // -------------------------------------------------------------------------
+
+ static final class BytesArrayInputView extends DataInputStream implements
DataInputView {
+ BytesArrayInputView(byte[] data) {
+ super(new ByteArrayInputStream(data));
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+ }
+
+ static final class BytesArrayOutputView extends DataOutputStream implements
DataOutputView {
+ BytesArrayOutputView(ByteArrayOutputStream baos) {
+ super(baos);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ write(0);
+ }
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ source.readFully(buffer);
+ write(buffer);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 5aa29e3d99dc..27bb2b3a56e8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -18,92 +18,39 @@
package org.apache.hudi.table.format.cdc;
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
-import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
-import org.apache.hudi.util.RowDataProjection;
-import org.apache.hudi.util.StreamerUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* The base InputFormat class to read Hoodie data set as change logs.
*/
-@Slf4j
public class CdcInputFormat extends MergeOnReadInputFormat {
private static final long serialVersionUID = 1L;
@@ -121,17 +68,20 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit
split) throws IOException {
if (split instanceof CdcInputSplit) {
HoodieCDCSupplementalLoggingMode mode =
OptionsResolver.getCDCSupplementalLoggingMode(conf);
- ImageManager manager = new ImageManager(conf, tableState.getRowType(),
this::getFileSliceIterator);
+ CdcImageManager manager = new CdcImageManager(
+ tableState.getRowType(),
+ FlinkWriteClients.getHoodieClientConfig(conf),
+ this::getFileSliceIterator);
Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc =
- cdcFileSplit -> getRecordIteratorV2(split.getTablePath(),
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
- return new CdcFileSplitsIterator((CdcInputSplit) split, manager,
recordIteratorFunc);
+ cdcFileSplit -> getRecordIteratorSafe(split.getTablePath(),
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
+ return new CdcIterators.CdcFileSplitsIterator(((CdcInputSplit)
split).getChanges(), manager, recordIteratorFunc);
} else {
return super.initIterator(split);
}
}
/**
- * Returns the builder for {@link MergeOnReadInputFormat}.
+ * Returns the builder for {@link CdcInputFormat}.
*/
public static Builder builder() {
return new Builder();
@@ -139,22 +89,21 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit
split) {
try {
- // get full schema iterator.
final HoodieSchema schema = HoodieSchemaCache.intern(
HoodieSchema.parse(tableState.getTableSchema()));
- // before/after images have assumption of snapshot scan, so `emitDelete`
is set as false
+ // before/after images use snapshot scan semantics, so emitDelete is
false
return getSplitRowIterator(split, schema, schema,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
} catch (IOException e) {
throw new HoodieException("Failed to create iterator for split: " +
split, e);
}
}
- private ClosableIterator<RowData> getRecordIteratorV2(
+ private ClosableIterator<RowData> getRecordIteratorSafe(
String tablePath,
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager) {
+ CdcImageManager imageManager) {
try {
return getRecordIterator(tablePath, maxCompactionMemoryInBytes,
fileSplit, mode, imageManager);
} catch (IOException e) {
@@ -167,41 +116,56 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager) throws IOException {
+ CdcImageManager imageManager) throws IOException {
switch (fileSplit.getCdcInferCase()) {
case BASE_FILE_INSERT:
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton");
String path = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
- return new AddBaseFileIterator(getBaseFileIterator(path));
+ return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
case BASE_FILE_DELETE:
ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
"Before file slice should exist");
FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
- MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath,
fileSlice, maxCompactionMemoryInBytes);
- return new RemoveBaseFileIterator(tableState,
getFileSliceIterator(inputSplit));
- case AS_IS:
- HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema()));
+ MergeOnReadInputSplit inputSplit =
CdcIterators.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
+ return new CdcIterators.RemoveBaseFileIterator(
+ tableState.getRequiredRowType(),
tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+ case AS_IS: {
+ HoodieSchema tblSchema =
HoodieSchema.parse(tableState.getTableSchema());
+ HoodieSchema reqSchema =
HoodieSchema.parse(tableState.getRequiredSchema());
+ HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tblSchema);
HoodieSchema cdcSchema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case DATA_BEFORE_AFTER:
- return new BeforeAfterImageIterator(tablePath, tableState,
hadoopConf, cdcSchema, fileSplit);
+ return new CdcIterators.BeforeAfterImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema,
tableState.getRequiredRowType(), cdcSchema, fileSplit);
case DATA_BEFORE:
- return new BeforeImageIterator(conf, hadoopConf, tablePath,
tableState, cdcSchema, fileSplit, imageManager);
+ return new CdcIterators.BeforeImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema,
tableState.getRequiredRowType(),
+ tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
cdcSchema, fileSplit, imageManager);
case OP_KEY_ONLY:
- return new RecordKeyImageIterator(conf, hadoopConf, tablePath,
tableState, cdcSchema, fileSplit, imageManager);
+ return new CdcIterators.RecordKeyImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema,
tableState.getRequiredRowType(),
+ tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
cdcSchema, fileSplit, imageManager);
default:
- throw new AssertionError("Unexpected mode" + mode);
+ throw new AssertionError("Unexpected mode: " + mode);
}
+ }
case LOG_FILE:
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton");
String logFilepath = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
- MergeOnReadInputSplit split = singleLogFile2Split(tablePath,
logFilepath, maxCompactionMemoryInBytes);
+ MergeOnReadInputSplit split =
CdcIterators.singleLogFile2Split(tablePath, logFilepath,
maxCompactionMemoryInBytes);
ClosableIterator<HoodieRecord<RowData>> recordIterator =
getSplitRecordIterator(split);
- return new DataLogFileIterator(maxCompactionMemoryInBytes,
imageManager, fileSplit, tableState, recordIterator, metaClient);
+ return new CdcIterators.DataLogFileIterator(
+ maxCompactionMemoryInBytes, imageManager, fileSplit,
+ HoodieSchema.parse(tableState.getTableSchema()),
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ recordIterator, metaClient, imageManager.getWriteConfig());
case REPLACE_COMMIT:
- return new ReplaceCommitIterator(conf, tablePath, tableState,
fileSplit, this::getFileSliceIterator);
+ return new CdcIterators.ReplaceCommitIterator(
+ tablePath, tableState.getRequiredRowType(),
tableState.getRequiredPositions(),
+ maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
default:
throw new AssertionError("Unexpected cdc file split infer case: " +
fileSplit.getCdcInferCase());
}
@@ -222,656 +186,6 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
return fileGroupReader.getClosableHoodieRecordIterator();
}
- // -------------------------------------------------------------------------
- // Inner Class
- // -------------------------------------------------------------------------
- static class CdcFileSplitsIterator implements ClosableIterator<RowData> {
- private ImageManager imageManager; // keep a reference to release resource
- private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
- private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc;
- private ClosableIterator<RowData> recordIterator;
-
- CdcFileSplitsIterator(
- CdcInputSplit inputSplit,
- ImageManager imageManager,
- Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc) {
- this.fileSplitIterator =
Arrays.asList(inputSplit.getChanges()).iterator();
- this.imageManager = imageManager;
- this.recordIteratorFunc = recordIteratorFunc;
- }
-
- @Override
- public boolean hasNext() {
- if (recordIterator != null) {
- if (recordIterator.hasNext()) {
- return true;
- } else {
- recordIterator.close(); // release resource
- recordIterator = null;
- }
- }
- if (fileSplitIterator.hasNext()) {
- HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
- recordIterator = recordIteratorFunc.apply(fileSplit);
- return recordIterator.hasNext();
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return recordIterator.next();
- }
-
- @Override
- public void close() {
- if (recordIterator != null) {
- recordIterator.close();
- }
- if (imageManager != null) {
- imageManager.close();
- imageManager = null;
- }
- }
- }
-
- static class AddBaseFileIterator implements ClosableIterator<RowData> {
- // base file record iterator
- private ClosableIterator<RowData> nested;
-
- private RowData currentRecord;
-
- AddBaseFileIterator(ClosableIterator<RowData> nested) {
- this.nested = nested;
- }
-
- @Override
- public boolean hasNext() {
- if (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- currentRecord.setRowKind(RowKind.INSERT);
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- this.nested = null;
- }
- }
- }
-
- static class RemoveBaseFileIterator implements ClosableIterator<RowData> {
- private ClosableIterator<RowData> nested;
- private final RowDataProjection projection;
-
- RemoveBaseFileIterator(MergeOnReadTableState tableState,
ClosableIterator<RowData> iterator) {
- this.nested = iterator;
- this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
- }
-
- @Override
- public boolean hasNext() {
- return nested.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = nested.next();
- row.setRowKind(RowKind.DELETE);
- return this.projection.project(row);
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- this.nested = null;
- }
- }
- }
-
- // accounting to HoodieCDCInferenceCase.LOG_FILE
- static class DataLogFileIterator implements ClosableIterator<RowData> {
- private final HoodieSchema tableSchema;
- private final long maxCompactionMemoryInBytes;
- private final ImageManager imageManager;
- private final RowDataProjection projection;
- private final BufferedRecordMerger recordMerger;
- private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
- private final DeleteContext deleteContext;
-
- private ExternalSpillableMap<String, byte[]> beforeImages;
- private RowData currentImage;
- private RowData sideImage;
- private HoodieReaderContext<RowData> readerContext;
- private String[] orderingFields;
- private TypedProperties props;
-
- DataLogFileIterator(
- long maxCompactionMemoryInBytes,
- ImageManager imageManager,
- HoodieCDCFileSplit cdcFileSplit,
- MergeOnReadTableState tableState,
- ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
- HoodieTableMetaClient metaClient) throws IOException {
- this.tableSchema = HoodieSchema.parse(tableState.getTableSchema());
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.imageManager = imageManager;
- this.projection =
tableState.getRequiredRowType().equals(tableState.getRowType())
- ? null
- : RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
- HoodieWriteConfig writeConfig = this.imageManager.writeConfig;
- this.props = writeConfig.getProps();
- this.readerContext = new
FlinkReaderContextFactory(metaClient).getContext();
- readerContext.initRecordMerger(props);
- this.orderingFields = ConfigUtils.getOrderingFields(props);
- this.recordMerger = BufferedRecordMergerFactory.create(
- readerContext,
- readerContext.getMergeMode(),
- false,
- Option.of(imageManager.writeConfig.getRecordMerger()),
- tableSchema,
-
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(),
writeConfig.getPayloadClass())),
- props,
- metaClient.getTableConfig().getPartialUpdateMode());
- this.logRecordIterator = logRecordIterator;
- initImages(cdcFileSplit);
- this.deleteContext = new DeleteContext(props,
tableSchema).withReaderSchema(tableSchema);
- }
-
- private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
- // init before images
- if (fileSplit.getBeforeFileSlice().isPresent() &&
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- } else {
- // still initializes an empty map
- this.beforeImages =
FormatUtils.spillableMap(this.imageManager.writeConfig,
maxCompactionMemoryInBytes, getClass().getSimpleName());
- }
- }
-
- @Override
- public boolean hasNext() {
- if (this.sideImage != null) {
- this.currentImage = this.sideImage;
- this.sideImage = null;
- return true;
- }
- while (logRecordIterator.hasNext()) {
- HoodieRecord<RowData> record = logRecordIterator.next();
- RowData existed =
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
- if (isDelete(record)) {
- // it's a deleted record.
- if (existed != null) {
- // there is a real record deleted.
- existed.setRowKind(RowKind.DELETE);
- this.currentImage = existed;
- return true;
- }
- } else {
- if (existed == null) {
- // a new record is inserted.
- RowData newRow = record.getData();
- newRow.setRowKind(RowKind.INSERT);
- this.currentImage = newRow;
- return true;
- } else {
- // an existed record is updated, assuming new record and existing
record share the same hoodie key
- HoodieOperation operation =
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
- HoodieRecord<RowData> historyRecord = new
HoodieFlinkRecord(record.getKey(), operation, existed);
- HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord,
record).get();
- if (merged.getData() != existed) {
- // update happens
- existed.setRowKind(RowKind.UPDATE_BEFORE);
- this.currentImage = existed;
- RowData mergedRow = merged.getData();
- mergedRow.setRowKind(RowKind.UPDATE_AFTER);
- this.imageManager.updateImageRecord(record.getRecordKey(),
beforeImages, mergedRow);
- this.sideImage = mergedRow;
-
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return this.projection != null ?
this.projection.project(this.currentImage) : this.currentImage;
- }
-
- @Override
- public void close() {
- this.logRecordIterator.close();
- this.imageManager.close();
- }
-
- @SuppressWarnings("unchecked")
- private Option<HoodieRecord<RowData>>
mergeRowWithLog(HoodieRecord<RowData> historyRecord, HoodieRecord<RowData>
newRecord) {
- try {
- BufferedRecord<RowData> historyBufferedRecord =
BufferedRecords.fromHoodieRecord(historyRecord, tableSchema,
readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord<RowData> newBufferedRecord =
BufferedRecords.fromHoodieRecord(newRecord, tableSchema,
readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord<RowData> mergedRecord =
recordMerger.finalMerge(historyBufferedRecord, newBufferedRecord);
- return
Option.ofNullable(readerContext.getRecordContext().constructHoodieRecord(mergedRecord,
historyRecord.getPartitionPath()));
- } catch (IOException e) {
- throw new HoodieIOException("Merge base and delta payloads exception",
e);
- }
- }
-
- private boolean isDelete(HoodieRecord<RowData> record) {
- return record.isDelete(deleteContext, CollectionUtils.emptyProps());
- }
- }
-
- abstract static class BaseImageIterator implements ClosableIterator<RowData>
{
- private final HoodieSchema requiredSchema;
- private final int[] requiredPos;
- private final GenericRecordBuilder recordBuilder;
- private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
-
- // the changelog records iterator
- private HoodieCDCLogRecordIterator cdcItr;
-
- private GenericRecord cdcRecord;
-
- private RowData sideImage;
-
- private RowData currentImage;
-
- BaseImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
- this.requiredPos = getRequiredPos(tableState.getTableSchema(),
this.requiredSchema);
- this.recordBuilder = new
GenericRecordBuilder(requiredSchema.getAvroSchema());
- this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
- StoragePath hadoopTablePath = new StoragePath(tablePath);
- HoodieStorage storage = HoodieStorageUtils.getStorage(tablePath,
HadoopFSUtils.getStorageConf(hadoopConf));
- HoodieLogFile[] cdcLogFiles =
fileSplit.getCdcFiles().stream().map(cdcFile -> {
- try {
- return new HoodieLogFile(
- storage.getPathInfo(new StoragePath(hadoopTablePath, cdcFile)));
- } catch (IOException e) {
- throw new HoodieIOException("Fail to call getFileStatus", e);
- }
- }).toArray(HoodieLogFile[]::new);
- this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema);
- }
-
- private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
- HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableSchema));
- List<String> fields =
dataSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
- return required.getFields().stream()
- .map(f -> fields.indexOf(f.name()))
- .mapToInt(i -> i)
- .toArray();
- }
-
- @Override
- public boolean hasNext() {
- if (this.sideImage != null) {
- currentImage = this.sideImage;
- this.sideImage = null;
- return true;
- } else if (this.cdcItr.hasNext()) {
- cdcRecord = (GenericRecord) this.cdcItr.next();
- String op = String.valueOf(cdcRecord.get(0));
- resolveImage(op);
- return true;
- }
- return false;
- }
-
- protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
-
- protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
-
- @Override
- public RowData next() {
- return currentImage;
- }
-
- @Override
- public void close() {
- if (this.cdcItr != null) {
- this.cdcItr.close();
- this.cdcItr = null;
- }
- }
-
- private void resolveImage(String op) {
- switch (op) {
- case "i":
- currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
- break;
- case "u":
- currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
- sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
- break;
- case "d":
- currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
- break;
- default:
- throw new AssertionError("Unexpected");
- }
- }
-
- protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord,
- requiredSchema,
- requiredPos,
- recordBuilder);
- RowData resolved = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
- resolved.setRowKind(rowKind);
- return resolved;
- }
- }
-
- // op, ts, before_image, after_image
- static class BeforeAfterImageIterator extends BaseImageIterator {
- BeforeAfterImageIterator(
- String tablePath,
- MergeOnReadTableState tableState,
- org.apache.hadoop.conf.Configuration hadoopConf,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- // op, key, before_image
- static class BeforeImageIterator extends BaseImageIterator {
- protected ExternalSpillableMap<String, byte[]> afterImages;
-
- protected final long maxCompactionMemoryInBytes;
-
- protected final RowDataProjection projection;
- protected final ImageManager imageManager;
-
- BeforeImageIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
- this.maxCompactionMemoryInBytes =
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
- this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
- this.imageManager = imageManager;
- initImages(fileSplit);
- }
-
- protected void initImages(
- HoodieCDCFileSplit fileSplit) throws IOException {
- ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
- "Current file slice does not exist for instant: " +
fileSplit.getInstant());
- this.afterImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, this.afterImages,
rowKind);
- row.setRowKind(rowKind);
- return this.projection.project(row);
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- // op, key
- static class RecordKeyImageIterator extends BeforeImageIterator {
- protected ExternalSpillableMap<String, byte[]> beforeImages;
-
- RecordKeyImageIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema,
fileSplit, imageManager);
- }
-
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
- // init after images
- super.initImages(fileSplit);
- // init before images
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " +
fileSplit.getInstant());
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
- String recordKey = cdcRecord.get(1).toString();
- RowData row = this.imageManager.getImageRecord(recordKey,
this.beforeImages, rowKind);
- row.setRowKind(rowKind);
- return this.projection.project(row);
- }
- }
-
- static class ReplaceCommitIterator implements ClosableIterator<RowData> {
- private final ClosableIterator<RowData> itr;
- private final RowDataProjection projection;
-
- ReplaceCommitIterator(
- Configuration flinkConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieCDCFileSplit fileSplit,
- Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
- this.itr = initIterator(tablePath,
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit,
splitIteratorFunc);
- this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
- }
-
- private ClosableIterator<RowData> initIterator(
- String tablePath,
- long maxCompactionMemoryInBytes,
- HoodieCDCFileSplit fileSplit,
- Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
- // init before images
-
- // the before file slice must exist,
- // see HoodieCDCExtractor#extractCDCFileSplits for details
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " +
fileSplit.getInstant());
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
- return splitIteratorFunc.apply(inputSplit);
- }
-
- @Override
- public boolean hasNext() {
- return this.itr.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = this.itr.next();
- row.setRowKind(RowKind.DELETE);
- return this.projection.project(row);
- }
-
- @Override
- public void close() {
- this.itr.close();
- }
- }
-
- public static final class BytesArrayInputView extends DataInputStream
implements DataInputView {
- public BytesArrayInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = this.skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
- }
-
- public static final class BytesArrayOutputView extends DataOutputStream
implements DataOutputView {
- public BytesArrayOutputView(ByteArrayOutputStream baos) {
- super(baos);
- }
-
- public void skipBytesToWrite(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; ++i) {
- this.write(0);
- }
- }
-
- public void write(DataInputView source, int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- this.write(buffer);
- }
- }
-
- /**
- * A before/after image manager
- * that caches the image records by versions(file slices).
- */
- private static class ImageManager implements AutoCloseable {
- private final HoodieWriteConfig writeConfig;
-
- private final RowDataSerializer serializer;
- private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc;
-
- private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
-
- public ImageManager(
- Configuration flinkConf,
- RowType rowType,
- Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
- this.serializer = new RowDataSerializer(rowType);
- this.splitIteratorFunc = splitIteratorFunc;
- this.cache = new TreeMap<>();
- this.writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf);
- }
-
- public ExternalSpillableMap<String, byte[]> getOrLoadImages(
- long maxCompactionMemoryInBytes,
- FileSlice fileSlice) throws IOException {
- final String instant = fileSlice.getBaseInstantTime();
- if (this.cache.containsKey(instant)) {
- return cache.get(instant);
- }
- // clean the earliest file slice first
- if (this.cache.size() > 1) {
- // keep at most 2 versions: before & after
- String instantToClean = this.cache.keySet().iterator().next();
- this.cache.remove(instantToClean).close();
- }
- ExternalSpillableMap<String, byte[]> images =
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
- this.cache.put(instant, images);
- return images;
- }
-
- private ExternalSpillableMap<String, byte[]> loadImageRecords(
- long maxCompactionMemoryInBytes,
- FileSlice fileSlice) throws IOException {
- MergeOnReadInputSplit inputSplit =
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice,
maxCompactionMemoryInBytes);
- // initialize the image records map
- ExternalSpillableMap<String, byte[]> imageRecordsMap =
- FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes,
getClass().getSimpleName());
- try (ClosableIterator<RowData> itr =
splitIteratorFunc.apply(inputSplit)) {
- while (itr.hasNext()) {
- RowData row = itr.next();
- String recordKey =
row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializer.serialize(row, new BytesArrayOutputView(baos));
- imageRecordsMap.put(recordKey, baos.toByteArray());
- }
- }
- return imageRecordsMap;
- }
-
- public RowData getImageRecord(
- String recordKey,
- ExternalSpillableMap<String, byte[]> cache,
- RowKind rowKind) {
- byte[] bytes = cache.get(recordKey);
- ValidationUtils.checkState(bytes != null,
- "Key " + recordKey + " does not exist in current file group");
- try {
- RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
- row.setRowKind(rowKind);
- return row;
- } catch (IOException e) {
- throw new HoodieException("Deserialize bytes into row data exception",
e);
- }
- }
-
- public void updateImageRecord(
- String recordKey,
- ExternalSpillableMap<String, byte[]> cache,
- RowData row) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- try {
- serializer.serialize(row, new BytesArrayOutputView(baos));
- } catch (IOException e) {
- throw new HoodieException("Serialize row data into bytes exception",
e);
- }
- cache.put(recordKey, baos.toByteArray());
- }
-
- public RowData removeImageRecord(
- String recordKey,
- ExternalSpillableMap<String, byte[]> cache) {
- byte[] bytes = cache.remove(recordKey);
- if (bytes == null) {
- return null;
- }
- try {
- return serializer.deserialize(new BytesArrayInputView(bytes));
- } catch (IOException e) {
- throw new HoodieException("Deserialize bytes into row data exception",
e);
- }
- }
-
- @Override
- public void close() {
- this.cache.values().forEach(ExternalSpillableMap::close);
- this.cache.clear();
- }
- }
-
/**
* Builder for {@link CdcInputFormat}.
*/
@@ -911,30 +225,4 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
return new CdcInputFormat(conf, tableState, fieldTypes, predicates,
limit, emitDelete);
}
}
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
- public static MergeOnReadInputSplit fileSlice2Split(
- String tablePath,
- FileSlice fileSlice,
- long maxCompactionMemoryInBytes) {
- Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- // filter out the cdc logs
- .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
- .collect(Collectors.toList()));
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- return new MergeOnReadInputSplit(0, basePath, logPaths,
fileSlice.getLatestInstantTime(),
- tablePath, maxCompactionMemoryInBytes,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
- fileSlice.getFileId(), fileSlice.getPartitionPath());
- }
-
- public static MergeOnReadInputSplit singleLogFile2Split(String tablePath,
String filePath, long maxCompactionMemoryInBytes) {
- return new MergeOnReadInputSplit(0, null,
Option.of(Collections.singletonList(filePath)),
- FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)),
tablePath, maxCompactionMemoryInBytes,
- FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
- FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new
StoragePath(filePath).getParent()));
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
new file mode 100644
index 000000000000..5f4781494217
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Shared iterator implementations for CDC record reading, used by both
+ * {@link CdcInputFormat} and the Source V2 CDC split reader.
+ */
+public final class CdcIterators {
+
+ private CdcIterators() {
+ }
+
+ // -------------------------------------------------------------------------
+ // Top-level iterator: fans out over an ordered list of CDC file-splits
+ // -------------------------------------------------------------------------
+
+ /**
+ * Iterates over an ordered sequence of {@link HoodieCDCFileSplit}s,
delegating
+ * per-split record reading to a user-supplied factory function.
+ */
+ public static class CdcFileSplitsIterator implements
ClosableIterator<RowData> {
+ private CdcImageManager imageManager;
+ private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+ private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc;
+ private ClosableIterator<RowData> recordIterator;
+
+ public CdcFileSplitsIterator(
+ HoodieCDCFileSplit[] changes,
+ CdcImageManager imageManager,
+ Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc) {
+ this.fileSplitIterator = Arrays.asList(changes).iterator();
+ this.imageManager = imageManager;
+ this.recordIteratorFunc = recordIteratorFunc;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (recordIterator != null) {
+ if (recordIterator.hasNext()) {
+ return true;
+ } else {
+ recordIterator.close();
+ recordIterator = null;
+ }
+ }
+ if (fileSplitIterator.hasNext()) {
+ recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+ return recordIterator.hasNext();
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return recordIterator.next();
+ }
+
+ @Override
+ public void close() {
+ if (recordIterator != null) {
+ recordIterator.close();
+ }
+ if (imageManager != null) {
+ imageManager.close();
+ imageManager = null;
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // BASE_FILE_INSERT / BASE_FILE_DELETE
+ // -------------------------------------------------------------------------
+
+ /**
+ * Wraps a base-file parquet iterator and marks every record as {@link
RowKind#INSERT}.
+ * Used for the {@code BASE_FILE_INSERT} CDC inference case.
+ */
+ public static class AddBaseFileIterator implements ClosableIterator<RowData>
{
+ private ClosableIterator<RowData> nested;
+ private RowData currentRecord;
+
+ public AddBaseFileIterator(ClosableIterator<RowData> nested) {
+ this.nested = nested;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nested.hasNext()) {
+ currentRecord = nested.next();
+ currentRecord.setRowKind(RowKind.INSERT);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ /**
+ * Wraps a file-slice iterator and marks every record as {@link
RowKind#DELETE}, applying
+ * required-column projection. Used for the {@code BASE_FILE_DELETE} CDC
inference case.
+ */
+ public static class RemoveBaseFileIterator implements
ClosableIterator<RowData> {
+ private ClosableIterator<RowData> nested;
+ private final RowDataProjection projection;
+
+ public RemoveBaseFileIterator(
+ RowType requiredRowType,
+ int[] requiredPositions,
+ ClosableIterator<RowData> iterator) {
+ this.nested = iterator;
+ this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nested.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = nested.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // LOG_FILE
+ // -------------------------------------------------------------------------
+
+ /**
+ * Handles the {@code LOG_FILE} CDC inference case: compares records from
the log file against
+ * before-image snapshots to emit INSERT / UPDATE_BEFORE / UPDATE_AFTER /
DELETE events.
+ */
+ public static class DataLogFileIterator implements ClosableIterator<RowData>
{
+ private final HoodieSchema tableSchema;
+ private final long maxCompactionMemoryInBytes;
+ private final CdcImageManager imageManager;
+ private final RowDataProjection projection;
+ private final BufferedRecordMerger recordMerger;
+ private final ClosableIterator<HoodieRecord<RowData>> logRecordIterator;
+ private final DeleteContext deleteContext;
+ private final HoodieReaderContext<RowData> readerContext;
+ private final String[] orderingFields;
+ private final TypedProperties props;
+
+ private ExternalSpillableMap<String, byte[]> beforeImages;
+ private RowData currentImage;
+ private RowData sideImage;
+
+ public DataLogFileIterator(
+ long maxCompactionMemoryInBytes,
+ CdcImageManager imageManager,
+ HoodieCDCFileSplit cdcFileSplit,
+ HoodieSchema tableSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
+ HoodieTableMetaClient metaClient,
+ HoodieWriteConfig writeConfig) throws IOException {
+ this.tableSchema = tableSchema;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.imageManager = imageManager;
+ this.projection =
HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
+ ? null : RowDataProjection.instance(requiredRowType,
requiredPositions);
+ this.props = writeConfig.getProps();
+ this.readerContext = new
FlinkReaderContextFactory(metaClient).getContext();
+ readerContext.initRecordMerger(props);
+ this.orderingFields = ConfigUtils.getOrderingFields(props);
+ this.recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ readerContext.getMergeMode(),
+ false,
+ Option.of(writeConfig.getRecordMerger()),
+ tableSchema,
+
Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(),
writeConfig.getPayloadClass())),
+ props,
+ metaClient.getTableConfig().getPartialUpdateMode());
+ this.logRecordIterator = logRecordIterator;
+ this.deleteContext = new DeleteContext(props,
tableSchema).withReaderSchema(tableSchema);
+ initImages(cdcFileSplit, writeConfig);
+ }
+
+ private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig
writeConfig) throws IOException {
+ if (fileSplit.getBeforeFileSlice().isPresent() &&
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+ this.beforeImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ } else {
+ this.beforeImages = FormatUtils.spillableMap(writeConfig,
maxCompactionMemoryInBytes, getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ }
+ while (logRecordIterator.hasNext()) {
+ HoodieRecord<RowData> record = logRecordIterator.next();
+ RowData existed =
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+ if (isDelete(record)) {
+ if (existed != null) {
+ existed.setRowKind(RowKind.DELETE);
+ currentImage = existed;
+ return true;
+ }
+ } else {
+ if (existed == null) {
+ RowData newRow = record.getData();
+ newRow.setRowKind(RowKind.INSERT);
+ currentImage = newRow;
+ return true;
+ } else {
+ HoodieOperation operation =
HoodieOperation.fromValue(existed.getRowKind().toByteValue());
+ HoodieRecord<RowData> historyRecord = new
HoodieFlinkRecord(record.getKey(), operation, existed);
+ HoodieRecord<RowData> merged = mergeRowWithLog(historyRecord,
record).get();
+ if (merged.getData() != existed) {
+ existed.setRowKind(RowKind.UPDATE_BEFORE);
+ currentImage = existed;
+ RowData mergedRow = merged.getData();
+ mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+ imageManager.updateImageRecord(record.getRecordKey(),
beforeImages, mergedRow);
+ sideImage = mergedRow;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return projection != null ? projection.project(currentImage) :
currentImage;
+ }
+
+ @Override
+ public void close() {
+ logRecordIterator.close();
+ imageManager.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Option<HoodieRecord<RowData>> mergeRowWithLog(
+ HoodieRecord<RowData> historyRecord, HoodieRecord<RowData> newRecord) {
+ try {
+ BufferedRecord<RowData> histBuf = BufferedRecords.fromHoodieRecord(
+ historyRecord, tableSchema, readerContext.getRecordContext(),
props, orderingFields, deleteContext);
+ BufferedRecord<RowData> newBuf = BufferedRecords.fromHoodieRecord(
+ newRecord, tableSchema, readerContext.getRecordContext(), props,
orderingFields, deleteContext);
+ BufferedRecord<RowData> merged = recordMerger.finalMerge(histBuf,
newBuf);
+ return Option.ofNullable(readerContext.getRecordContext()
+ .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
+ } catch (IOException e) {
+ throw new HoodieIOException("Merge base and delta payloads exception",
e);
+ }
+ }
+
+ private boolean isDelete(HoodieRecord<RowData> record) {
+ return record.isDelete(deleteContext, CollectionUtils.emptyProps());
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // AS_IS — supplemental logging modes
+ // -------------------------------------------------------------------------
+
+ /**
+ * Base iterator for CDC log files stored with supplemental logging (AS_IS
inference case).
+ * Reads a {@link HoodieCDCLogRecordIterator} and resolves before/after
images using
+ * subclass-specific logic.
+ */
+ public abstract static class BaseImageIterator implements
ClosableIterator<RowData> {
+ private final HoodieSchema requiredSchema;
+ private final int[] requiredPos;
+ private final GenericRecordBuilder recordBuilder;
+ private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+ private HoodieCDCLogRecordIterator cdcItr;
+
+ private GenericRecord cdcRecord;
+ private RowData sideImage;
+ private RowData currentImage;
+
+ protected BaseImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ this.requiredSchema = requiredSchema;
+ this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
+ this.recordBuilder = new
GenericRecordBuilder(requiredSchema.getAvroSchema());
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
+
+ StoragePath hadoopTablePath = new StoragePath(tablePath);
+ HoodieStorage storage = HoodieStorageUtils.getStorage(
+ tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
+ HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
+ .map(cdcFile -> {
+ try {
+ return new HoodieLogFile(storage.getPathInfo(new
StoragePath(hadoopTablePath, cdcFile)));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get file status for CDC
log: " + cdcFile, e);
+ }
+ })
+ .toArray(HoodieLogFile[]::new);
+ this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles,
cdcSchema);
+ }
+
+ private static int[] computeRequiredPos(HoodieSchema tableSchema,
HoodieSchema requiredSchema) {
+ HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(tableSchema);
+ List<String> fields = dataSchema.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList());
+ return requiredSchema.getFields().stream()
+ .map(f -> fields.indexOf(f.name()))
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ } else if (cdcItr.hasNext()) {
+ cdcRecord = (GenericRecord) cdcItr.next();
+ String op = String.valueOf(cdcRecord.get(0));
+ resolveImage(op);
+ return true;
+ }
+ return false;
+ }
+
+ protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ @Override
+ public RowData next() {
+ return currentImage;
+ }
+
+ @Override
+ public void close() {
+ if (cdcItr != null) {
+ cdcItr.close();
+ cdcItr = null;
+ }
+ }
+
+ private void resolveImage(String op) {
+ switch (op) {
+ case "i":
+ currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+ break;
+ case "u":
+ currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+ sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+ break;
+ case "d":
+ currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+ break;
+ default:
+ throw new AssertionError("Unexpected CDC operation: " + op);
+ }
+ }
+
+ protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ avroRecord, requiredSchema, requiredPos, recordBuilder);
+ RowData resolved = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
+ resolved.setRowKind(rowKind);
+ return resolved;
+ }
+ }
+
+ /**
+ * Reads CDC log files that contain both before and after images ({@code
DATA_BEFORE_AFTER} mode).
+ * CDC record layout: [op, ts, before_image, after_image].
+ */
+ public static class BeforeAfterImageIterator extends BaseImageIterator {
+ public BeforeAfterImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ /**
+ * Reads CDC log files containing op + key + before_image ({@code
DATA_BEFORE} mode).
+ * The after-image is loaded from the after file-slice snapshot via {@link
CdcImageManager}.
+ * CDC record layout: [op, key, before_image].
+ */
+ public static class BeforeImageIterator extends BaseImageIterator {
+ protected ExternalSpillableMap<String, byte[]> afterImages;
+ protected final long maxCompactionMemoryInBytes;
+ protected final RowDataProjection projection;
+ protected final CdcImageManager imageManager;
+
+ public BeforeImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ CdcImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType, cdcSchema, fileSplit);
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
+ this.imageManager = imageManager;
+ initImages(fileSplit);
+ }
+
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
+ ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+ "Current file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.afterImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, afterImages,
rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ /**
+ * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
+ * Both before and after images are loaded from file-slice snapshots via
{@link CdcImageManager}.
+ * CDC record layout: [op, key].
+ */
+ public static class RecordKeyImageIterator extends BeforeImageIterator {
+ protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+ public RecordKeyImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ CdcImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema,
requiredRowType,
+ requiredPositions, maxCompactionMemoryInBytes, cdcSchema, fileSplit,
imageManager);
+ }
+
+ @Override
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
+ super.initImages(fileSplit);
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.beforeImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, beforeImages,
rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // REPLACE_COMMIT
+ // -------------------------------------------------------------------------
+
+ /**
+ * Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records
from the
+ * before-slice as {@link RowKind#DELETE}.
+ */
+ public static class ReplaceCommitIterator implements
ClosableIterator<RowData> {
+ private final ClosableIterator<RowData> itr;
+ private final RowDataProjection projection;
+
+ public ReplaceCommitIterator(
+ String tablePath,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ MergeOnReadInputSplit inputSplit = fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
+ this.itr = splitIteratorFunc.apply(inputSplit);
+ this.projection = RowDataProjection.instance(requiredRowType,
requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = itr.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ itr.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ public static MergeOnReadInputSplit fileSlice2Split(
+ String tablePath,
+ FileSlice fileSlice,
+ long maxCompactionMemoryInBytes) {
+ Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ // filter out the cdc logs
+ .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList()));
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ return new MergeOnReadInputSplit(0, basePath, logPaths,
fileSlice.getLatestInstantTime(),
+ tablePath, maxCompactionMemoryInBytes,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
+ fileSlice.getFileId(), fileSlice.getPartitionPath());
+ }
+
+ public static MergeOnReadInputSplit singleLogFile2Split(String tablePath,
String filePath, long maxCompactionMemoryInBytes) {
+ return new MergeOnReadInputSplit(0, null,
Option.of(Collections.singletonList(filePath)),
+ FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)),
tablePath, maxCompactionMemoryInBytes,
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
+ FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new
StoragePath(filePath).getParent()));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
index d0de3f930d37..b0d0579da084 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -237,13 +237,7 @@ public class TestHoodieCdcSplitReaderFunction {
1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
- // The call should NOT throw IllegalArgumentException (type guard passes).
- // It will throw some other exception when trying to do real I/O.
- Exception ex = assertThrows(Exception.class, () ->
function.read(cdcSplit));
- assertNotNull(ex);
- // Must not be an IllegalArgumentException (which the type guard throws)
- if (ex instanceof IllegalArgumentException) {
- throw new AssertionError("read() should not throw
IllegalArgumentException for a CdcSourceSplit", ex);
- }
+ // Should not throw exception
+ function.read(cdcSplit);
}
}