Alowator commented on code in PR #12967: URL: https://github.com/apache/hudi/pull/12967#discussion_r2000181537
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/RowDataStreamWriteFunction.java: ########## @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.model.HoodieFlinkInternalRow; +import org.apache.hudi.client.model.HoodieFlinkRecord; +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.MappingIterator; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.v2.HandleRecords; +import org.apache.hudi.metrics.FlinkStreamWriteMetrics; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.buffer.RowDataBucket; +import org.apache.hudi.sink.buffer.TotalSizeTracer; +import org.apache.hudi.sink.bulk.RowDataKeyGen; +import org.apache.hudi.sink.common.AbstractStreamWriteFunction; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.exception.MemoryPagesExhaustedException; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.apache.hudi.util.PreCombineFieldExtractor; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Sink function to write the data to the underneath filesystem. + * + * <p><h2>Work Flow</h2> + * + * <p>The function firstly buffers the data (RowData) in a binary buffer based on {@code BinaryInMemorySortBuffer}. + * It flushes(write) the records batch when the batch size exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * or the memory of the binary buffer is exhausted, and could not append any more data or a Flink checkpoint starts. + * After a batch has been written successfully, the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} + * to mark a successful write. + * + * <p><h2>The Semantics</h2> + * + * <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator + * starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always + * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. + * + * <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until + * the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing, + * when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata + * can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job + * recovers. + * + * <p><h2>Fault Tolerance</h2> + * + * <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully. + * It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint, + * the write function blocks data buffer flushing for the configured checkpoint timeout + * before it throws exception, any checkpoint failure would finally trigger the job failure. + * + * <p>Note: The function task requires the input stream be shuffled by the file IDs. + * + * @see StreamWriteOperatorCoordinator + */ +public class RowDataStreamWriteFunction extends AbstractStreamWriteFunction<HoodieFlinkInternalRow> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RowDataStreamWriteFunction.class); + + /** + * Write buffer as buckets for a checkpoint. The key is bucket ID. + */ + private transient Map<String, RowDataBucket> buckets; + + protected transient WriteFunction writeFunction; + + private transient HoodieRecordMerger recordMerger; + + protected final RowType rowType; + + protected final RowDataKeyGen keyGen; + + protected final PreCombineFieldExtractor preCombineFieldExtractor; + + /** + * Total size tracer. + */ + private transient TotalSizeTracer tracer; + + /** + * Metrics for flink stream write. + */ + protected transient FlinkStreamWriteMetrics writeMetrics; + + protected transient MemorySegmentPool memorySegmentPool; + + private transient FlinkTaskContextSupplier taskContextSupplier; + + private static final AtomicLong RECORD_COUNTER = new AtomicLong(1); + + /** + * Constructs a StreamingSinkFunction. + * + * @param config The config options + */ + public RowDataStreamWriteFunction(Configuration config, RowType rowType) { + super(config); + this.rowType = rowType; + this.keyGen = RowDataKeyGen.instance(config, rowType); + this.preCombineFieldExtractor = getPreCombineFieldExtractor(config, rowType); + } + + @Override + public void open(Configuration parameters) throws IOException { + this.tracer = new TotalSizeTracer(this.config); + this.taskContextSupplier = new FlinkTaskContextSupplier(getRuntimeContext()); + initBuffer(); + initWriteFunction(); + initMergeClass(); + registerMetrics(); + } + + @Override + public void snapshotState() { + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushRemaining(false); + } + + @Override + public void processElement(HoodieFlinkInternalRow record, + ProcessFunction<HoodieFlinkInternalRow, Object>.Context ctx, + Collector<Object> out) throws Exception { + bufferRecord(record); + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + super.endInput(); + flushRemaining(true); + this.writeClient.cleanHandles(); + this.writeStatuses.clear(); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initBuffer() { + this.buckets = new LinkedHashMap<>(); + this.memorySegmentPool = MemorySegmentPoolFactory.createMemorySegmentPool(config); + } + + protected int getTaskId() { + return this.taskContextSupplier.getPartitionIdSupplier().get(); + } + + private void initWriteFunction() { + final String writeOperation = this.config.get(FlinkOptions.OPERATION); + switch (WriteOperationType.fromValue(writeOperation)) { + case INSERT: + this.writeFunction = (records, bucketInfo, instantTime) -> this.writeClient.insert(records, bucketInfo, instantTime); + break; + case UPSERT: + case DELETE: // shares the code path with UPSERT + this.writeFunction = (records, bucketInfo, instantTime) -> this.writeClient.upsert(records, bucketInfo, instantTime); + break; + case INSERT_OVERWRITE: + this.writeFunction = (records, bucketInfo, instantTime) -> this.writeClient.insertOverwrite(records, bucketInfo, instantTime); + break; + case INSERT_OVERWRITE_TABLE: + this.writeFunction = (records, bucketInfo, instantTime) -> this.writeClient.insertOverwriteTable(records, bucketInfo, instantTime); + break; + default: + throw new RuntimeException("Unsupported write operation : " + writeOperation); + } + } + + private void initMergeClass() { + recordMerger = HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger()); + LOG.info("init hoodie merge with class [{}]", recordMerger.getClass().getName()); + } + + /** + * Returns the bucket ID with the given value {@code value}. + */ + private String getBucketID(String partitionPath, String fileId) { + return StreamerUtil.generateBucketKey(partitionPath, fileId); + } + + /** + * Create a data bucket if not exists and trying to insert a data row, there exists two cases that data cannot be + * inserted successfully: + * <p>1. Data Bucket do not exist and there is no enough memory pages to create a new binary buffer. + * <p>2. Data Bucket exists, but fails to request new memory pages from memory pool. + */ + private boolean createBucketAndWriteRow(String bucketID, HoodieFlinkInternalRow record) throws IOException { + try { + RowDataBucket bucket = this.buckets.computeIfAbsent(bucketID, + k -> new RowDataBucket( + bucketID, + createBucketBuffer(false), + createBucketBuffer(true), + getBucketInfo(record), + this.config.get(FlinkOptions.WRITE_BATCH_SIZE))); + + return bucket.writeRow(record.getRowData()); + } catch (MemoryPagesExhaustedException e) { + LOG.info("There is no enough free pages in memory pool to create buffer, need flushing first."); + return false; + } + } + + /** + * Buffers the given record. + * + * <p>Flush the data bucket first if the bucket records size is greater than + * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * + * <p>Flush the max size data bucket if the total buffer size exceeds the configured + * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}. + * + * @param record HoodieFlinkInternalRow + */ + protected void bufferRecord(HoodieFlinkInternalRow record) throws IOException { + writeMetrics.markRecordIn(); + // set operation type into rowkind of row. + record.getRowData().setRowKind( + RowKind.fromByteValue(HoodieOperation.fromName(record.getOperationType()).getValue())); + final String bucketID = getBucketID(record.getPartitionPath(), record.getFileId()); + + boolean success = createBucketAndWriteRow(bucketID, record); + // 1. flushing bucket for memory pool is full. + if (!success) { + RowDataBucket bucketToFlush = this.buckets.values().stream() + .max(Comparator.comparingLong(b -> b.getBufferSize())) Review Comment: b.getBufferSize() - Lambda can be replaced with method reference ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BufferUtils.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.sink.exception.MemoryPagesExhaustedException; + +import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.flink.table.types.logical.RowType; + +/** + * Utilities to create binary buffer for writing functions. + */ +public class BufferUtils { + // minimum pages for a BinaryInMemorySortBuffer + private static final int MIN_REQUIRED_BUFFERS = 3; + + public static BinaryInMemorySortBuffer createBuffer(RowType rowType, MemorySegmentPool memorySegmentPool) { + if (memorySegmentPool.freePages() < MIN_REQUIRED_BUFFERS) { + // there is no enough free pages to create a binary buffer, may need flush first. + throw new MemoryPagesExhaustedException("Free pages are not enough to create a BinaryInMemorySortBuffer."); + } + Pair<NormalizedKeyComputer, RecordComparator> sortHelper = getSortHelper(rowType); + return BinaryInMemorySortBuffer.createBuffer( + sortHelper.getLeft(), + new RowDataSerializer(rowType), + new BinaryRowDataSerializer(rowType.getFieldCount()), + sortHelper.getRight(), + memorySegmentPool); + } + + /** + * Generate NormalizedKeyComputer and RecordComparator for creating BinaryInMemorySortBuffer. + */ + private static Pair<NormalizedKeyComputer, RecordComparator> getSortHelper(RowType rowType) { + return Pair.of(new DummyNormalizedKeyComputer(), new DummyRecordComparator()); + } Review Comment: Usage a pair here makes BinaryInMemorySortBuffer.createBuffer call confusing due to usage of sortHelper.getLeft(), sortHelper.getRight(). I would prefer to make different factory methods KeyComputer and RecordComparator. It will be more understandable. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/HandleRecords.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.v2; + +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; + +import java.util.Collections; +import java.util.Iterator; + +/** + * {@code HandleRecords} is a holder containing records iterator for {@code HoodieDataBlock} + * and delete records iterator for {@code HoodieDeleteBlock}. + * + * <p>Insert records and delete records are separated using two iterators for more efficient + * memory utilization, for example, the data bytes in the iterator are reused based Flink managed + * memory pool, and the RowData wrapper is also a singleton reusable object to minimize on-heap + * memory costs, thus being more GC friendly for massive data scenarios. + */ +public class HandleRecords { + private final Iterator<HoodieRecord> recordItr; + private final Option<Iterator<DeleteRecord>> deleteRecordItr; + + public HandleRecords(Iterator<HoodieRecord> recordItr, Iterator<DeleteRecord> deleteItr) { + this.recordItr = recordItr; + this.deleteRecordItr = Option.ofNullable(deleteItr); + } + + public Iterator<HoodieRecord> getRecordItr() { + return this.recordItr; + } + + public Iterator<DeleteRecord> getDeleteRecordItr() { + return this.deleteRecordItr.orElse(Collections.emptyIterator()); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Iterator<HoodieRecord> recordItr; + private Iterator<DeleteRecord> deleteRecordItr; + + public Builder() { + } + + public Builder withRecordItr(Iterator<HoodieRecord> recordItr) { + this.recordItr = recordItr; + return this; + } + + public Builder withDeleteRecordItr(Iterator<DeleteRecord> deleteRecordItr) { + this.deleteRecordItr = deleteRecordItr; + return this; + } + + public HandleRecords build() { + return new HandleRecords(recordItr, deleteRecordItr); + } + } Review Comment: I think need to choose use builder or make HandleRecords constructor private. I would prefer to use public constructor without builder, just make deleteItr arg @Nullable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
