cshuo commented on code in PR #12967:
URL: https://github.com/apache/hudi/pull/12967#discussion_r2002958411


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -213,6 +213,7 @@ public static HoodieWriteConfig getHoodieClientConfig(
                 
.parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 
* 1024)
                 
.parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 
1024)
                 
.parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 
1024 * 1024L)
+                
.withWriteUtcTimezone(conf.get(FlinkOptions.WRITE_UTC_TIMEZONE))

Review Comment:
   Firstly, It's a storage conf to control value of TimeStamp type written to 
parquet file. If not set into the storage conf, there would be  many changes to 
the write path just for passing the UTC option into the underlying writer.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java:
##########
@@ -70,6 +76,32 @@ public static boolean isAppendMode(Configuration conf) {
     return isInsertOperation(conf) && ((isCowTable(conf) && 
!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)) || isMorTable(conf));
   }
 
+  /**
+   * Returns whether the RowData append is enabled with given configuration 
{@code conf}.
+   * <p>
+   * todo:
+   * <p> support RowData append for COW, see HUDI-9149
+   * <p> support RowData append for operation besides UPSERT, see HUDI-9149
+   * <p> support RowData append for nested complex DataType field, see 
HUDI-9146, currently RowData
+   * reader for MOR table is not supported yet, Flink uses Avro parquet reader 
to read parquet data
+   * blocks in log file written by RowData writer, there exists some 
discrepancies between Avro parquet
+   * support and RowData parquet support regarding nested complex type, 
leading to failure during reading.
+   * After the RowData reader is supported, there would be no problems.
+   */
+  public static boolean supportRowDataAppend(Configuration conf, RowType 
rowType) {
+    return conf.get(FlinkOptions.INSERT_ROWDATA_MODE_ENABLED)
+        && HoodieTableType.valueOf(conf.get(FlinkOptions.TABLE_TYPE)) == 
HoodieTableType.MERGE_ON_READ
+        && 
(WriteOperationType.valueOf(conf.get(FlinkOptions.OPERATION).toUpperCase()) == 
WriteOperationType.UPSERT
+            || 
WriteOperationType.valueOf(conf.get(FlinkOptions.OPERATION).toUpperCase()) == 
WriteOperationType.DELETE)
+        && !DataTypeUtils.containsNestedComplexType(rowType);
+  }
+
+  @VisibleForTesting
+  public static boolean supportRowDataAppend(Configuration conf) {

Review Comment:
   ok



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java:
##########
@@ -167,4 +169,73 @@ public static DataType ensureColumnsAsNonNullable(DataType 
dataType, @Nullable L
     return 
DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull();
   }
 
+  /**
+   * Adds the Hoodie metadata fields to the given row type.
+   */
+  public static RowType addMetadataFields(
+      RowType rowType,
+      boolean withOperationField) {
+    return addMetadataFields(rowType, true, true, withOperationField);
+  }
+
+  /**
+   * Adds the Hoodie metadata fields to the given row type.
+   */
+  public static RowType addMetadataFields(
+      RowType rowType,
+      boolean isPopulateMetaFields,
+      boolean includeInstant,

Review Comment:
   introduced for previous change, not needed anymore, will remove.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestRowDataWriteMergeOnRead.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Test cases for delta stream write using RowData stream write function.
+ */
+public class TestRowDataWriteMergeOnRead extends TestWriteMergeOnRead {
+  @Override
+  protected void setUp(Configuration conf) {
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);

Review Comment:
   ok



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java:
##########
@@ -382,7 +386,8 @@ public void testInsertAsyncClustering() throws Exception {
   @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200 + getBatchSize());
+    conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 64);

Review Comment:
   It's tuned to match the buffer size setting previously for testing minibatch 
flush.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketRowDataStreamWriteFunction.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.sink.RowDataStreamWriteFunction;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A stream write function with simple bucket hash index, and it writes flink 
RowData without Avro conversion.
+ *
+ * <p>The task holds a fresh new local index: {(partition + bucket number) 
&rarr fileId} mapping, this index
+ * is used for deciding whether the incoming records in an UPDATE or INSERT.
+ * The index is local because different partition paths have separate items in 
the index.
+ */
+public class BucketRowDataStreamWriteFunction extends 
RowDataStreamWriteFunction {

Review Comment:
   The diff is it extends `RowDataStreamWriteFunction`, besides that, it's same 
with `BucketStreamWriteFunction`, and we'll remove `BucketStreamWriteFunction` 
once all writing paths support rowdata writing.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/RowDataStreamWriteFunction.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.MappingIterator;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.buffer.RowDataBucket;
+import org.apache.hudi.sink.buffer.TotalSizeTracer;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
+import org.apache.hudi.sink.utils.BufferUtils;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.MutableIteratorWrapperIterator;
+import org.apache.hudi.util.PreCombineFieldExtractor;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p><h2>Work Flow</h2>
+ *
+ * <p>The function firstly buffers the data (RowData) in a binary buffer based 
on {@code BinaryInMemorySortBuffer}.
+ * It flushes(write) the records batch when the batch size exceeds the 
configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * or the memory of the binary buffer is exhausted, and could not append any 
more data or a Flink checkpoint starts.
+ * After a batch has been written successfully, the function notifies its 
operator coordinator {@link StreamWriteOperatorCoordinator}
+ * to mark a successful write.
+ *
+ * <p><h2>The Semantics</h2>
+ *
+ * <p>The task implements exactly-once semantics by buffering the data between 
checkpoints. The operator coordinator
+ * starts a new instant on the timeline when a checkpoint triggers, the 
coordinator checkpoints always
+ * start before its operator, so when this function starts a checkpoint, a 
REQUESTED instant already exists.
+ *
+ * <p>The function process thread blocks data buffering after the checkpoint 
thread finishes flushing the existing data buffer until
+ * the current checkpoint succeed and the coordinator starts a new instant. 
Any error triggers the job failure during the metadata committing,
+ * when the job recovers from a failure, the write function re-send the write 
metadata to the coordinator to see if these metadata
+ * can re-commit, thus if unexpected error happens during the instant 
committing, the coordinator would retry to commit when the job
+ * recovers.
+ *
+ * <p><h2>Fault Tolerance</h2>
+ *
+ * <p>The operator coordinator checks and commits the last instant then starts 
a new one after a checkpoint finished successfully.
+ * It rolls back any inflight instant before it starts a new instant, this 
means one hoodie instant only span one checkpoint,
+ * the write function blocks data buffer flushing for the configured 
checkpoint timeout
+ * before it throws exception, any checkpoint failure would finally trigger 
the job failure.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by the 
file IDs.
+ *
+ * @see StreamWriteOperatorCoordinator
+ */
+public class RowDataStreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlinkInternalRow> {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataStreamWriteFunction.class);
+
+  /**
+   * Write buffer as buckets for a checkpoint. The key is bucket ID.
+   */
+  private transient Map<String, RowDataBucket> buckets;
+
+  protected transient WriteFunction writeFunction;
+
+  private transient HoodieRecordMerger recordMerger;
+
+  protected final RowType rowType;
+
+  protected final RowDataKeyGen keyGen;
+
+  protected final PreCombineFieldExtractor preCombineFieldExtractor;
+
+  /**
+   * Total size tracer.
+   */
+  private transient TotalSizeTracer tracer;
+
+  /**
+   * Metrics for flink stream write.
+   */
+  protected transient FlinkStreamWriteMetrics writeMetrics;
+
+  protected transient MemorySegmentPool memorySegmentPool;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public RowDataStreamWriteFunction(Configuration config, RowType rowType) {
+    super(config);
+    this.rowType = rowType;
+    this.keyGen = RowDataKeyGen.instance(config, rowType);
+    this.preCombineFieldExtractor = getPreCombineFieldExtractor(config, 
rowType);
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+    this.tracer = new TotalSizeTracer(this.config);
+    initBuffer();
+    initWriteFunction();
+    initMergeClass();
+    registerMetrics();
+  }
+
+  @Override
+  public void snapshotState() {
+    // Based on the fact that the coordinator starts the checkpoint first,
+    // it would check the validity.
+    // wait for the buffer data flush out and request a new instant
+    flushRemaining(false);
+  }
+
+  @Override
+  public void processElement(HoodieFlinkInternalRow record,
+                             ProcessFunction<HoodieFlinkInternalRow, 
Object>.Context ctx,
+                             Collector<Object> out) throws Exception {
+    bufferRecord(record);
+  }
+
+  @Override
+  public void close() {
+    if (this.writeClient != null) {
+      this.writeClient.close();
+    }
+  }
+
+  /**
+   * End input action for batch source.
+   */
+  public void endInput() {
+    super.endInput();
+    flushRemaining(true);
+    this.writeClient.cleanHandles();
+    this.writeStatuses.clear();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initBuffer() {
+    this.buckets = new LinkedHashMap<>();
+    this.memorySegmentPool = 
MemorySegmentPoolFactory.createMemorySegmentPool(config);
+  }
+
+  private void initWriteFunction() {
+    final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+    switch (WriteOperationType.fromValue(writeOperation)) {
+      case INSERT:
+        this.writeFunction = (records, bucketInfo, instantTime) -> 
this.writeClient.insert(records, bucketInfo, instantTime);
+        break;
+      case UPSERT:
+      case DELETE: // shares the code path with UPSERT
+        this.writeFunction = (records, bucketInfo, instantTime) -> 
this.writeClient.upsert(records, bucketInfo, instantTime);
+        break;
+      case INSERT_OVERWRITE:
+        this.writeFunction = (records, bucketInfo, instantTime) -> 
this.writeClient.insertOverwrite(records, bucketInfo, instantTime);
+        break;
+      case INSERT_OVERWRITE_TABLE:
+        this.writeFunction = (records, bucketInfo, instantTime) -> 
this.writeClient.insertOverwriteTable(records, bucketInfo, instantTime);
+        break;
+      default:
+        throw new RuntimeException("Unsupported write operation : " + 
writeOperation);
+    }
+  }
+
+  private void initMergeClass() {
+    recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+    LOG.info("init hoodie merge with class [{}]", 
recordMerger.getClass().getName());
+  }
+
+  /**
+   * Returns the bucket ID with the given value {@code value}.
+   */
+  private String getBucketID(String partitionPath, String fileId) {
+    return StreamerUtil.generateBucketKey(partitionPath, fileId);
+  }
+
+  /**
+   * Create a data bucket if not exists and trying to insert a data row, there 
exists two cases that data cannot be
+   * inserted successfully:
+   * <p>1. Data Bucket do not exist and there is no enough memory pages to 
create a new binary buffer.
+   * <p>2. Data Bucket exists, but fails to request new memory pages from 
memory pool.
+   */
+  private boolean createBucketAndWriteRow(String bucketID, 
HoodieFlinkInternalRow record) throws IOException {
+    try {
+      RowDataBucket bucket = this.buckets.computeIfAbsent(bucketID,
+          k -> new RowDataBucket(
+              bucketID,
+              BufferUtils.createBuffer(rowType, memorySegmentPool),
+              getBucketInfo(record),
+              this.config.get(FlinkOptions.WRITE_BATCH_SIZE)));
+
+      return bucket.writeRow(record.getRowData());
+    } catch (MemoryPagesExhaustedException e) {
+      LOG.info("There is no enough free pages in memory pool to create buffer, 
need flushing first.");
+      return false;
+    }
+  }
+
+  /**
+   * Buffers the given record.
+   *
+   * <p>Flush the data bucket first if the bucket records size is greater than
+   * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
+   *
+   * <p>Flush the max size data bucket if the total buffer size exceeds the 
configured
+   * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}.
+   *
+   * @param record HoodieFlinkInternalRow
+   */
+  protected void bufferRecord(HoodieFlinkInternalRow record) throws 
IOException {
+    writeMetrics.markRecordIn();
+    // set operation type into rowkind of row.
+    record.getRowData().setRowKind(
+        
RowKind.fromByteValue(HoodieOperation.fromName(record.getOperationType()).getValue()));
+    final String bucketID = getBucketID(record.getPartitionPath(), 
record.getFileId());
+
+    boolean success = createBucketAndWriteRow(bucketID, record);
+    // 1. flushing bucket for memory pool is full.
+    if (!success) {
+      RowDataBucket bucketToFlush = this.buckets.values().stream()
+          .max(Comparator.comparingLong(RowDataBucket::getBufferSize))
+          .orElseThrow(NoSuchElementException::new);
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.getBufferSize());
+        disposeBucket(bucketToFlush);
+      } else {
+        LOG.warn("The buffer size hits the threshold {}, but still flush the 
max size data bucket failed!", this.tracer.maxBufferSize);
+      }
+      // try write row again
+      success = createBucketAndWriteRow(bucketID, record);
+      if (!success) {
+        throw new RuntimeException("Buffer is too small to hold a single 
record.");
+      }
+    }
+    RowDataBucket bucket = this.buckets.get(bucketID);
+    this.tracer.trace(bucket.getLastRecordSize());
+    // 2. flushing bucket for bucket is full.
+    if (bucket.isFull()) {
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.getBufferSize());
+        disposeBucket(bucket);
+      }
+    }
+    // update buffer metrics after tracing buffer size
+    writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
+  }
+
+  /**
+   * RowData data bucket can not be used after disposing.
+   */
+  private void disposeBucket(RowDataBucket rowDataBucket) {
+    rowDataBucket.dispose();
+    this.buckets.remove(rowDataBucket.getBucketId());
+  }
+
+  private static BucketInfo getBucketInfo(HoodieFlinkInternalRow internalRow) {
+    BucketType bucketType;
+    switch (internalRow.getInstantTime()) {
+      case "I":
+        bucketType = BucketType.INSERT;
+        break;
+      case "U":
+        bucketType = BucketType.UPDATE;
+        break;
+      default:
+        throw new HoodieException("Unexpected bucket type: " + 
internalRow.getInstantTime());
+    }
+    return new BucketInfo(bucketType, internalRow.getFileId(), 
internalRow.getPartitionPath());
+  }
+
+  private boolean hasData() {
+    return !this.buckets.isEmpty()
+        && this.buckets.values().stream().anyMatch(bucket -> 
!bucket.isEmpty());
+  }
+
+  private boolean flushBucket(RowDataBucket bucket) {
+    String instant = instantToWrite(true);
+
+    if (instant == null) {
+      // in case there are empty checkpoints that has no input data
+      LOG.info("No inflight instant when flushing data, skip.");
+      return false;
+    }
+
+    ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no 
buffering records");
+    final List<WriteStatus> writeStatus = writeRecords(instant, bucket);
+    final WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .instantTime(instant) // the write instant may shift but the event 
still use the currentInstant.
+        .writeStatus(writeStatus)
+        .lastBatch(false)
+        .endInput(false)
+        .build();
+
+    this.eventGateway.sendEventToCoordinator(event);
+    writeStatuses.addAll(writeStatus);
+    return true;
+  }
+
+  private void flushRemaining(boolean endInput) {
+    writeMetrics.startDataFlush();
+    this.currentInstant = instantToWrite(hasData());
+    if (this.currentInstant == null) {
+      // in case there are empty checkpoints that has no input data
+      throw new HoodieException("No inflight instant when flushing data!");
+    }
+    final List<WriteStatus> writeStatus;
+    if (!buckets.isEmpty()) {
+      writeStatus = new ArrayList<>();
+      this.buckets.values()
+          // The records are partitioned by the bucket ID and each batch sent 
to
+          // the writer belongs to one bucket.
+          .forEach(bucket -> {
+            if (!bucket.isEmpty()) {
+              writeStatus.addAll(writeRecords(currentInstant, bucket));
+              bucket.dispose();
+            }
+          });
+    } else {
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
currentInstant);
+      writeStatus = Collections.emptyList();
+    }
+    final WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .instantTime(currentInstant)
+        .writeStatus(writeStatus)
+        .lastBatch(true)
+        .endInput(endInput)
+        .build();
+
+    this.eventGateway.sendEventToCoordinator(event);
+    this.buckets.clear();
+    this.tracer.reset();
+    this.writeClient.cleanHandles();
+    this.writeStatuses.addAll(writeStatus);
+    // blocks flushing until the coordinator starts a new instant
+    this.confirming = true;
+
+    writeMetrics.endDataFlush();
+    writeMetrics.resetAfterCommit();
+  }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    writeMetrics = new FlinkStreamWriteMetrics(metrics);
+    writeMetrics.registerMetrics();
+  }
+
+  protected List<WriteStatus> writeRecords(
+      String instant,
+      RowDataBucket rowDataBucket) {
+    writeMetrics.startFileFlush();
+
+    Iterator<BinaryRowData> rowItr =
+        new MutableIteratorWrapperIterator<>(
+            rowDataBucket.getDataIterator(), () -> new 
BinaryRowData(rowType.getFieldCount()));
+    Iterator<HoodieRecord> recordItr = new MappingIterator<>(
+        rowItr, rowData -> convertToRecord(rowData, 
rowDataBucket.getBucketInfo()));
+
+    List<WriteStatus> statuses = 
writeFunction.write(deduplicateRecordsIfNeeded(recordItr), 
rowDataBucket.getBucketInfo(), instant);
+    writeMetrics.endFileFlush();
+    writeMetrics.increaseNumOfFilesWritten();
+    return statuses;
+  }
+
+  protected HoodieFlinkRecord convertToRecord(RowData dataRow, BucketInfo 
bucketInfo) {
+    String key = keyGen.getRecordKey(dataRow);
+    Comparable<?> preCombineValue = 
preCombineFieldExtractor.getPreCombineField(dataRow);
+    HoodieOperation operation = 
HoodieOperation.fromValue(dataRow.getRowKind().toByteValue());
+    HoodieKey hoodieKey = new HoodieKey(key, bucketInfo.getPartitionPath());
+    return new HoodieFlinkRecord(hoodieKey, operation, preCombineValue, 
dataRow);
+  }
+
+  protected Iterator<HoodieRecord> 
deduplicateRecordsIfNeeded(Iterator<HoodieRecord> records) {
+    if (config.get(FlinkOptions.PRE_COMBINE)) {
+      // todo: sort by record key, lazy merge during iterating, default for 
COW.
+      return records;
+    } else {
+      return records;
+    }
+  }
+
+  private PreCombineFieldExtractor getPreCombineFieldExtractor(Configuration 
conf, RowType rowType) {
+    String preCombineField = OptionsResolver.getPreCombineField(conf);
+    if (StringUtils.isNullOrEmpty(preCombineField)) {
+      // return a dummy extractor.
+      return rowData -> HoodieRecord.DEFAULT_ORDERING_VALUE;
+    }
+    int preCombineFieldIdx = rowType.getFieldNames().indexOf(preCombineField);
+    LogicalType fieldType = rowType.getChildren().get(preCombineFieldIdx);
+    RowData.FieldGetter preCombineFieldGetter = 
RowData.createFieldGetter(fieldType, preCombineFieldIdx);
+
+    // currently the log reader for flink is avro reader, and it merges 
records based on ordering value
+    // in form of AVRO format, so here we align the data format with reader.
+    // TODO refactor this after RowData reader is supported, HUDI-9146.
+    RowDataToAvroConverters.RowDataToAvroConverter fieldConverter =
+        RowDataToAvroConverters.createConverter(fieldType, 
conf.get(FlinkOptions.WRITE_UTC_TIMEZONE));
+    Schema fieldSchema = AvroSchemaConverter.convertToSchema(fieldType, 
preCombineField);

Review Comment:
   orderingValue will be stored in HoodieDeleteBlock for delete record, and 
during reading it will be used for merging, so we should dealing with the value 
type carefully. Now, we just follow convention as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to