This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0acdfe9519 [INLONG-10195][Sort] Hudi connector support audit ID
(#10231)
0acdfe9519 is described below
commit 0acdfe9519a18048171b3ed8e0dcd71f0527b0b8
Author: XiaoYou201 <[email protected]>
AuthorDate: Wed May 22 09:47:12 2024 +0800
[INLONG-10195][Sort] Hudi connector support audit ID (#10231)
---
.../sort-flink-v1.15/sort-connectors/hudi/pom.xml | 1 -
.../inlong/sort/hudi/sink/StreamWriteFunction.java | 495 +++++++++++++++++++++
.../inlong/sort/hudi/sink/StreamWriteOperator.java | 43 ++
.../sort/hudi/sink/append/AppendWriteFunction.java | 156 +++++++
.../sort/hudi/sink/append/AppendWriteOperator.java | 44 ++
.../sink/bucket/BucketStreamWriteFunction.java | 185 ++++++++
.../sink/bucket/BucketStreamWriteOperator.java | 40 ++
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 229 ++++++++++
.../hudi/sink/bulk/BulkInsertWriteOperator.java | 55 +++
.../inlong/sort/hudi/sink/utils/Pipelines.java | 469 +++++++++++++++++++
.../inlong/sort/hudi/table/HoodieTableFactory.java | 355 +++++++++++++++
.../inlong/sort/hudi/table/HoodieTableSink.java | 150 +++++++
.../hudi/table/sink/HudiTableInlongFactory.java | 55 ---
.../org.apache.flink.table.factories.Factory | 2 +-
licenses/inlong-sort-connectors/LICENSE | 13 +
15 files changed, 2235 insertions(+), 57 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
index ca3dc60e6d..075b6da594 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
@@ -79,7 +79,6 @@
<configuration>
<artifactSet>
<includes>
- <include>org.apache.hudi:*</include>
<include>org.apache.hive:hive-exec</include>
<include>org.apache.hadoop:*</include>
<include>com.fasterxml.woodstox:*</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
new file mode 100644
index 0000000000..33551f6de5
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p><h2>Work Flow</h2>
+ *
+ * <p>The function firstly buffers the data as a batch of {@link
HoodieRecord}s,
+ * It flushes(write) the records batch when the batch size exceeds the
configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * or the total buffer size exceeds the configured size {@link
FlinkOptions#WRITE_TASK_MAX_SIZE}
+ * or a Flink checkpoint starts. After a batch has been written successfully,
+ * the function notifies its operator coordinator {@link
StreamWriteOperatorCoordinator} to mark a successful write.
+ *
+ * <p><h2>The Semantics</h2>
+ *
+ * <p>The task implements exactly-once semantics by buffering the data between
checkpoints. The operator coordinator
+ * starts a new instant on the timeline when a checkpoint triggers, the
coordinator checkpoints always
+ * start before its operator, so when this function starts a checkpoint, a
REQUESTED instant already exists.
+ *
+ * <p>The function process thread blocks data buffering after the checkpoint
thread finishes flushing the existing data buffer until
+ * the current checkpoint succeed and the coordinator starts a new instant.
Any error triggers the job failure during the metadata committing,
+ * when the job recovers from a failure, the write function re-send the write
metadata to the coordinator to see if these metadata
+ * can re-commit, thus if unexpected error happens during the instant
committing, the coordinator would retry to commit when the job
+ * recovers.
+ *
+ * <p><h2>Fault Tolerance</h2>
+ *
+ * <p>The operator coordinator checks and commits the last instant then starts
a new one after a checkpoint finished successfully.
+ * It rolls back any inflight instant before it starts a new instant, this
means one hoodie instant only span one checkpoint,
+ * the write function blocks data buffer flushing for the configured
checkpoint timeout
+ * before it throws exception, any checkpoint failure would finally trigger
the job failure.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by the
file IDs.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamWriteFunction.class);
+
+ /**
+ * Write buffer as buckets for a checkpoint. The key is bucket ID.
+ */
+ private transient Map<String, DataBucket> buckets;
+
+ private transient BiFunction<List<HoodieRecord>, String,
List<WriteStatus>> writeFunction;
+
+ /**
+ * Total size tracer.
+ */
+ private transient TotalSizeTracer tracer;
+ private final MetricOption metricOption;
+ private SinkMetricData sinkMetricData;
+
+ /**
+ * Constructs a StreamingSinkFunction.
+ *
+ * @param config The config options
+ */
+ public StreamWriteFunction(Configuration config, MetricOption
metricOption) {
+ super(config);
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.tracer = new TotalSizeTracer(this.config);
+ initBuffer();
+ initWriteFunction();
+ if (metricOption != null) {
+ this.sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ }
+
+ @Override
+ public void snapshotState() {
+ // Based on the fact that the coordinator starts the checkpoint first,
+ // it would check the validity.
+ // wait for the buffer data flush out and request a new instant
+ flushRemaining(false);
+ }
+
+ @Override
+ public void processElement(I value, ProcessFunction<I, Object>.Context
ctx, Collector<Object> out)
+ throws Exception {
+ bufferRecord((HoodieRecord<?>) value);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeWithEstimate(value);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.close();
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ super.endInput();
+ flushRemaining(true);
+ this.writeClient.cleanHandles();
+ this.writeStatuses.clear();
+ }
+
+ //
-------------------------------------------------------------------------
+ // Utilities
+ //
-------------------------------------------------------------------------
+
+ private void initBuffer() {
+ this.buckets = new LinkedHashMap<>();
+ }
+
+ private void initWriteFunction() {
+ final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+ switch (WriteOperationType.fromValue(writeOperation)) {
+ case INSERT:
+ this.writeFunction = (records, instantTime) ->
this.writeClient.insert(records, instantTime);
+ break;
+ case UPSERT:
+ this.writeFunction = (records, instantTime) ->
this.writeClient.upsert(records, instantTime);
+ break;
+ case INSERT_OVERWRITE:
+ this.writeFunction = (records, instantTime) ->
this.writeClient.insertOverwrite(records, instantTime);
+ break;
+ case INSERT_OVERWRITE_TABLE:
+ this.writeFunction =
+ (records, instantTime) ->
this.writeClient.insertOverwriteTable(records, instantTime);
+ break;
+ default:
+ throw new RuntimeException("Unsupported write operation : " +
writeOperation);
+ }
+ }
+
+ /**
+ * Represents a data item in the buffer, this is needed to reduce the
+ * memory footprint.
+ *
+ * <p>A {@link HoodieRecord} was firstly transformed into a {@link
DataItem}
+ * for buffering, it then transforms back to the {@link HoodieRecord}
before flushing.
+ */
+ private static class DataItem {
+
+ private final String key; // record key
+ private final String instant; // 'U' or 'I'
+ private final HoodieRecordPayload<?> data; // record payload
+ private final HoodieOperation operation; // operation
+
+ private DataItem(String key, String instant, HoodieRecordPayload<?>
data, HoodieOperation operation) {
+ this.key = key;
+ this.instant = instant;
+ this.data = data;
+ this.operation = operation;
+ }
+
+ public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
+ return new DataItem(
+ record.getRecordKey(),
+ record.getCurrentLocation().getInstantTime(),
+ ((HoodieAvroRecord) record).getData(),
+ record.getOperation());
+ }
+
+ public HoodieRecord<?> toHoodieRecord(String partitionPath) {
+ HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
+ HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data,
operation);
+ HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
+ record.setCurrentLocation(loc);
+ return record;
+ }
+ }
+
+ /**
+ * Data bucket.
+ */
+ private static class DataBucket {
+
+ private final List<DataItem> records;
+ private final BufferSizeDetector detector;
+ private final String partitionPath;
+ private final String fileID;
+
+ private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
+ this.records = new ArrayList<>();
+ this.detector = new BufferSizeDetector(batchSize);
+ this.partitionPath = hoodieRecord.getPartitionPath();
+ this.fileID = hoodieRecord.getCurrentLocation().getFileId();
+ }
+
+ /**
+ * Prepare the write data buffer: patch up all the records with
correct partition path.
+ */
+ public List<HoodieRecord> writeBuffer() {
+ // rewrite all the records with new record key
+ return records.stream()
+ .map(record -> record.toHoodieRecord(partitionPath))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Sets up before flush: patch up the first record with correct
partition path and fileID.
+ *
+ * <p>Note: the method may modify the given records {@code records}.
+ */
+ public void preWrite(List<HoodieRecord> records) {
+ // rewrite the first record with expected fileID
+ HoodieRecord<?> first = records.get(0);
+ HoodieRecord<?> record =
+ new HoodieAvroRecord<>(first.getKey(),
(HoodieRecordPayload) first.getData(), first.getOperation());
+ HoodieRecordLocation newLoc = new
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
+ record.setCurrentLocation(newLoc);
+
+ records.set(0, record);
+ }
+
+ public void reset() {
+ this.records.clear();
+ this.detector.reset();
+ }
+ }
+
+ /**
+ * Tool to detect if to flush out the existing buffer.
+ * Sampling the record to compute the size with 0.01 percentage.
+ */
+ private static class BufferSizeDetector {
+
+ private final Random random = new Random(47);
+ private static final int DENOMINATOR = 100;
+
+ private final double batchSizeBytes;
+
+ private long lastRecordSize = -1L;
+ private long totalSize = 0L;
+
+ BufferSizeDetector(double batchSizeMb) {
+ this.batchSizeBytes = batchSizeMb * 1024 * 1024;
+ }
+
+ boolean detect(Object record) {
+ if (lastRecordSize == -1 || sampling()) {
+ lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
+ }
+ totalSize += lastRecordSize;
+ return totalSize > this.batchSizeBytes;
+ }
+
+ boolean sampling() {
+ // 0.01 sampling percentage
+ return random.nextInt(DENOMINATOR) == 1;
+ }
+
+ void reset() {
+ this.lastRecordSize = -1L;
+ this.totalSize = 0L;
+ }
+ }
+
+ /**
+ * Tool to trace the total buffer size. It computes the maximum buffer
size,
+ * if current buffer size is greater than the maximum buffer size, the
data bucket
+ * flush triggers.
+ */
+ private static class TotalSizeTracer {
+
+ private long bufferSize = 0L;
+ private final double maxBufferSize;
+
+ TotalSizeTracer(Configuration conf) {
+ long mergeReaderMem = 100; // constant 100MB
+ long mergeMapMaxMem =
conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
+ this.maxBufferSize =
+ (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) -
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024;
+ final String errMsg = String.format(
+ "'%s' should be at least greater than '%s' plus merge
reader memory(constant 100MB now)",
+ FlinkOptions.WRITE_TASK_MAX_SIZE.key(),
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
+ ValidationUtils.checkState(this.maxBufferSize > 0, errMsg);
+ }
+
+ /**
+ * Trace the given record size {@code recordSize}.
+ *
+ * @param recordSize The record size
+ * @return true if the buffer size exceeds the maximum buffer size
+ */
+ boolean trace(long recordSize) {
+ this.bufferSize += recordSize;
+ return this.bufferSize > this.maxBufferSize;
+ }
+
+ void countDown(long size) {
+ this.bufferSize -= size;
+ }
+
+ public void reset() {
+ this.bufferSize = 0;
+ }
+ }
+
+ /**
+ * Returns the bucket ID with the given value {@code value}.
+ */
+ private String getBucketID(HoodieRecord<?> record) {
+ final String fileId = record.getCurrentLocation().getFileId();
+ return StreamerUtil.generateBucketKey(record.getPartitionPath(),
fileId);
+ }
+
+ /**
+ * Buffers the given record.
+ *
+ * <p>Flush the data bucket first if the bucket records size is greater
than
+ * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
+ *
+ * <p>Flush the max size data bucket if the total buffer size exceeds the
configured
+ * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}.
+ *
+ * @param value HoodieRecord
+ */
+ protected void bufferRecord(HoodieRecord<?> value) {
+ final String bucketID = getBucketID(value);
+
+ DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
+ k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+ final DataItem item = DataItem.fromHoodieRecord(value);
+
+ bucket.records.add(item);
+
+ boolean flushBucket = bucket.detector.detect(item);
+ boolean flushBuffer =
this.tracer.trace(bucket.detector.lastRecordSize);
+ if (flushBucket) {
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
+ } else if (flushBuffer) {
+ // find the max size bucket and flush it out
+ DataBucket bucketToFlush = this.buckets.values().stream()
+ .max(Comparator.comparingLong(b -> b.detector.totalSize))
+ .orElseThrow(NoSuchElementException::new);
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("The buffer size hits the threshold {}, but still
flush the max size data bucket failed!",
+ this.tracer.maxBufferSize);
+ }
+ }
+ }
+
+ private boolean hasData() {
+ return this.buckets.size() > 0
+ && this.buckets.values().stream().anyMatch(bucket ->
bucket.records.size() > 0);
+ }
+
+ @SuppressWarnings("unchecked, rawtypes")
+ private boolean flushBucket(DataBucket bucket) {
+ String instant = instantToWrite(true);
+
+ if (instant == null) {
+ // in case there are empty checkpoints that has no input data
+ LOG.info("No inflight instant when flushing data, skip.");
+ return false;
+ }
+
+ List<HoodieRecord> records = bucket.writeBuffer();
+ ValidationUtils.checkState(records.size() > 0, "Data bucket to flush
has no buffering records");
+ if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+ records =
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null,
-1);
+ }
+ bucket.preWrite(records);
+ final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, instant));
+ records.clear();
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(instant) // the write instant may shift but the
event still use the currentInstant.
+ .writeStatus(writeStatus)
+ .lastBatch(false)
+ .endInput(false)
+ .build();
+
+ this.eventGateway.sendEventToCoordinator(event);
+ writeStatuses.addAll(writeStatus);
+ return true;
+ }
+
+ @SuppressWarnings("unchecked, rawtypes")
+ private void flushRemaining(boolean endInput) {
+ this.currentInstant = instantToWrite(hasData());
+ if (this.currentInstant == null) {
+ // in case there are empty checkpoints that has no input data
+ throw new HoodieException("No inflight instant when flushing
data!");
+ }
+ final List<WriteStatus> writeStatus;
+ if (buckets.size() > 0) {
+ writeStatus = new ArrayList<>();
+ this.buckets.values()
+ // The records are partitioned by the bucket ID and each
batch sent to
+ // the writer belongs to one bucket.
+ .forEach(bucket -> {
+ List<HoodieRecord> records = bucket.writeBuffer();
+ if (records.size() > 0) {
+ if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+ records =
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null,
+ -1);
+ }
+ bucket.preWrite(records);
+ writeStatus.addAll(writeFunction.apply(records,
currentInstant));
+ records.clear();
+ bucket.reset();
+ }
+ });
+ } else {
+ LOG.info("No data to write in subtask [{}] for instant [{}]",
taskID, currentInstant);
+ writeStatus = Collections.emptyList();
+ }
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(currentInstant)
+ .writeStatus(writeStatus)
+ .lastBatch(true)
+ .endInput(endInput)
+ .build();
+
+ this.eventGateway.sendEventToCoordinator(event);
+ this.buckets.clear();
+ this.tracer.reset();
+ this.writeClient.cleanHandles();
+ this.writeStatuses.addAll(writeStatus);
+ // blocks flushing until the coordinator starts a new instant
+ this.confirming = true;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
new file mode 100644
index 0000000000..a25caf9b35
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link StreamSink}.
+ *
+ * @param <I> The input type
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {
+
+ public StreamWriteOperator(Configuration conf, MetricOption metricOption) {
+ super(new StreamWriteFunction<>(conf, metricOption));
+ }
+
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf,
MetricOption metricOption) {
+ return WriteOperatorFactory.instance(conf, new
StreamWriteOperator<>(conf, metricOption));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
new file mode 100644
index 0000000000..817e727f62
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.append;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function writes base files directly for each checkpoint,
+ * the file may roll over when it’s size hits the configured threshold.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunction.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Helper class for log mode.
+ */
+ private transient BulkInsertWriterHelper writerHelper;
+
+ /**
+ * Table row type.
+ */
+ private final RowType rowType;
+
+ private final MetricOption metricOption;
+
+ private SinkMetricData sinkMetricData;
+
+ /**
+ * Constructs an AppendWriteFunction.
+ *
+ * @param config The config options
+ */
+ public AppendWriteFunction(Configuration config, RowType rowType,
MetricOption metricOption) {
+ super(config);
+ this.rowType = rowType;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ if (metricOption != null) {
+ this.sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ }
+
+ @Override
+ public void snapshotState() {
+ // Based on the fact that the coordinator starts the checkpoint first,
+ // it would check the validity.
+ // wait for the buffer data flush out and request a new instant
+ flushData(false);
+ }
+
+ @Override
+ public void processElement(I value, Context ctx, Collector<Object> out)
throws Exception {
+ if (this.writerHelper == null) {
+ initWriterHelper();
+ }
+ this.writerHelper.write((RowData) value);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeWithEstimate(value);
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ super.endInput();
+ flushData(true);
+ this.writeStatuses.clear();
+ }
+
+ //
-------------------------------------------------------------------------
+ // Utilities
+ //
-------------------------------------------------------------------------
+ private void initWriterHelper() {
+ final String instant = instantToWrite(true);
+ if (instant == null) {
+ // in case there are empty checkpoints that has no input data
+ throw new HoodieException("No inflight instant when flushing
data!");
+ }
+ this.writerHelper =
+ new BulkInsertWriterHelper(this.config,
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+ instant, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+
+ private void flushData(boolean endInput) {
+ final List<WriteStatus> writeStatus;
+ if (this.writerHelper != null) {
+ writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+ this.currentInstant = this.writerHelper.getInstantTime();
+ } else {
+ writeStatus = Collections.emptyList();
+ this.currentInstant = instantToWrite(false);
+ LOG.info("No data to write in subtask [{}] for instant [{}]",
taskID, this.currentInstant);
+ }
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(this.currentInstant)
+ .writeStatus(writeStatus)
+ .lastBatch(true)
+ .endInput(endInput)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ // nullify the write helper for next ckp
+ this.writerHelper = null;
+ this.writeStatuses.addAll(writeStatus);
+ // blocks flushing until the coordinator starts a new instant
+ this.confirming = true;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
new file mode 100644
index 0000000000..41fa03e793
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.append;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link AppendWriteFunction}.
+ *
+ * @param <I> The input type
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class AppendWriteOperator<I> extends AbstractWriteOperator<I> {
+
+ public AppendWriteOperator(Configuration conf, RowType rowType,
MetricOption metricOption) {
+ super(new AppendWriteFunction<>(conf, rowType, metricOption));
+ }
+
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf,
RowType rowType,
+ MetricOption metricOption) {
+ return WriteOperatorFactory.instance(conf, new
AppendWriteOperator<>(conf, rowType, metricOption));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
new file mode 100644
index 0000000000..3bafbb806f
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bucket;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A stream write function with bucket hash index.
+ *
+ * <p>The task holds a fresh new local index: {(partition + bucket number)
&rarr fileId} mapping, this index
+ * is used for deciding whether the incoming records in an UPDATE or INSERT.
+ * The index is local because different partition paths have separate items in
the index.
+ *
+ * @param <I> the input type
+ */
+public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketStreamWriteFunction.class);
+
+ private int parallelism;
+
+ private int bucketNum;
+
+ private String indexKeyFields;
+
+ /**
+ * BucketID to file group mapping in each partition.
+ * Map(partition -> Map(bucketId, fileID)).
+ */
+ private Map<String, Map<Integer, String>> bucketIndex;
+
+ /**
+ * Incremental bucket index of the current checkpoint interval,
+ * it is needed because the bucket type('I' or 'U') should be decided
based on the committed files view,
+ * all the records in one bucket should have the same bucket type.
+ */
+ private Set<String> incBucketIndex;
+ private final MetricOption metricOption;
+ private SinkMetricData sinkMetricData;
+
+ /**
+ * Constructs a BucketStreamWriteFunction.
+ *
+ * @param config The config options
+ */
+ public BucketStreamWriteFunction(Configuration config, MetricOption
metricOption) {
+ super(config);
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ super.open(parameters);
+ this.bucketNum =
config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ this.bucketIndex = new HashMap<>();
+ this.incBucketIndex = new HashSet<>();
+ if (metricOption != null) {
+ this.sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ }
+
+ @Override
+ public void snapshotState() {
+ super.snapshotState();
+ this.incBucketIndex.clear();
+ }
+
+ @Override
+ public void processElement(I i, ProcessFunction<I, Object>.Context
context, Collector<Object> collector)
+ throws Exception {
+ HoodieRecord<?> record = (HoodieRecord<?>) i;
+ final HoodieKey hoodieKey = record.getKey();
+ final String partition = hoodieKey.getPartitionPath();
+ final HoodieRecordLocation location;
+
+ bootstrapIndexIfNeed(partition);
+ Map<Integer, String> bucketToFileId =
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
+ final int bucketNum = BucketIdentifier.getBucketId(hoodieKey,
indexKeyFields, this.bucketNum);
+ final String bucketId = partition + "/" + bucketNum;
+
+ if (incBucketIndex.contains(bucketId)) {
+ location = new HoodieRecordLocation("I",
bucketToFileId.get(bucketNum));
+ } else if (bucketToFileId.containsKey(bucketNum)) {
+ location = new HoodieRecordLocation("U",
bucketToFileId.get(bucketNum));
+ } else {
+ String newFileId =
BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+ location = new HoodieRecordLocation("I", newFileId);
+ bucketToFileId.put(bucketNum, newFileId);
+ incBucketIndex.add(bucketId);
+ }
+ record.unseal();
+ record.setCurrentLocation(location);
+ record.seal();
+ bufferRecord(record);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeWithEstimate(record);
+ }
+ }
+
+ /**
+ * Determine whether the current fileID belongs to the current task.
+ * (partition + curBucket) % numPartitions == this taskID belongs to this
task.
+ */
+ public boolean isBucketToLoad(int bucketNumber, String partition) {
+ final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE)
% parallelism;
+ int globalIndex = partitionIndex + bucketNumber;
+ return BucketIdentifier.mod(globalIndex, parallelism) == taskID;
+ }
+
+ /**
+ * Get partition_bucket -> fileID mapping from the existing hudi table.
+ * This is a required operation for each restart to avoid having duplicate
file ids for one bucket.
+ */
+ private void bootstrapIndexIfNeed(String partition) {
+ if (bucketIndex.containsKey(partition)) {
+ return;
+ }
+ LOG.info(String.format("Loading Hoodie Table %s, with path %s",
this.metaClient.getTableConfig().getTableName(),
+ this.metaClient.getBasePath() + "/" + partition));
+
+ // Load existing fileID belongs to this task
+ Map<Integer, String> bucketToFileIDMap = new HashMap<>();
+
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice
-> {
+ String fileId = fileSlice.getFileId();
+ int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
+ if (isBucketToLoad(bucketNumber, partition)) {
+ LOG.info(String.format("Should load this partition bucket %s
with fileId %s", bucketNumber, fileId));
+ // Validate that one bucketId has only ONE fileId
+ if (bucketToFileIDMap.containsKey(bucketNumber)) {
+ throw new RuntimeException(String.format("Duplicate fileId
%s from bucket %s of partition %s found "
+ + "during the BucketStreamWriteFunction index
bootstrap.", fileId, bucketNumber,
+ partition));
+ } else {
+ LOG.info(String.format("Adding fileId %s to the bucket %s
of partition %s.", fileId, bucketNumber,
+ partition));
+ bucketToFileIDMap.put(bucketNumber, fileId);
+ }
+ }
+ });
+ bucketIndex.put(partition, bucketToFileIDMap);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
new file mode 100644
index 0000000000..40b1a29423
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bucket;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link BucketStreamWriteFunction}.
+ *
+ * @param <I> The input type
+ */
+public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+
+ public BucketStreamWriteOperator(Configuration conf, MetricOption
metricOption) {
+ super(new BucketStreamWriteFunction<>(conf, metricOption));
+ }
+
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf,
MetricOption metricOption) {
+ return WriteOperatorFactory.instance(conf, new
BucketStreamWriteOperator<>(conf, metricOption));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
new file mode 100644
index 0000000000..44c8966e30
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bulk;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.WriterHelpers;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function should only be used in operation type {@link
WriteOperationType#BULK_INSERT}.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by
partition path.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class BulkInsertWriteFunction<I>
+ extends
+ AbstractWriteFunction<I> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+ /**
+ * Helper class for bulk insert mode.
+ */
+ private transient BulkInsertWriterHelper writerHelper;
+
+ /**
+ * Config options.
+ */
+ private final Configuration config;
+
+ /**
+ * Table row type.
+ */
+ private final RowType rowType;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ /**
+ * Write Client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ /**
+ * The initial inflight instant when start up.
+ */
+ private volatile String initInstant;
+
+ /**
+ * Gateway to send operator events to the operator coordinator.
+ */
+ private transient OperatorEventGateway eventGateway;
+
+ /**
+ * Checkpoint metadata.
+ */
+ private CkpMetadata ckpMetadata;
+
+ private final MetricOption metricOption;
+
+ private SinkMetricData sinkMetricData;
+ /**
+ * Constructs a StreamingSinkFunction.
+ *
+ * @param config The config options
+ */
+ public BulkInsertWriteFunction(Configuration config, RowType rowType,
MetricOption metricOption) {
+ this.config = config;
+ this.rowType = rowType;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeClient = FlinkWriteClients.createWriteClient(this.config,
getRuntimeContext());
+ this.ckpMetadata = CkpMetadata.getInstance(config);
+ this.initInstant = lastPendingInstant();
+ if (metricOption != null) {
+ this.sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ sendBootstrapEvent();
+ initWriterHelper();
+ }
+
+ @Override
+ public void processElement(I value, Context ctx, Collector<Object> out)
throws IOException {
+ this.writerHelper.write((RowData) value);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeWithEstimate(value);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.close();
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ final List<WriteStatus> writeStatus =
this.writerHelper.getWriteStatuses(this.taskID);
+
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(this.writerHelper.getInstantTime())
+ .writeStatus(writeStatus)
+ .lastBatch(true)
+ .endInput(true)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent event) {
+ // no operation
+ }
+
+ //
-------------------------------------------------------------------------
+ // Getter/Setter
+ //
-------------------------------------------------------------------------
+
+ public void setOperatorEventGateway(OperatorEventGateway
operatorEventGateway) {
+ this.eventGateway = operatorEventGateway;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Utilities
+ //
-------------------------------------------------------------------------
+
+ private void initWriterHelper() {
+ String instant = instantToWrite();
+ this.writerHelper = WriterHelpers.getWriterHelper(this.config,
this.writeClient.getHoodieTable(),
+ this.writeClient.getConfig(),
+ instant, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+
+ private void sendBootstrapEvent() {
+ WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .writeStatus(Collections.emptyList())
+ .instantTime("")
+ .bootstrap(true)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ LOG.info("Send bootstrap write metadata event to coordinator,
task[{}].", taskID);
+ }
+
+ /**
+ * Returns the last pending instant time.
+ */
+ protected String lastPendingInstant() {
+ return this.ckpMetadata.lastPendingInstant();
+ }
+
+ private String instantToWrite() {
+ String instant = lastPendingInstant();
+ // if exactly-once semantics turns on,
+ // waits for the checkpoint notification until the checkpoint timeout
threshold hits.
+ TimeWait timeWait = TimeWait.builder()
+ .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+ .action("instant initialize")
+ .build();
+ while (instant == null || instant.equals(this.initInstant)) {
+ // wait condition:
+ // 1. there is no inflight instant
+ // 2. the inflight instant does not change
+ // sleep for a while
+ timeWait.waitFor();
+ // refresh the inflight instant
+ instant = lastPendingInstant();
+ }
+ return instant;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
new file mode 100644
index 0000000000..55d2290666
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bulk;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for bulk insert mode sink.
+ *
+ * @param <I> The input type
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class BulkInsertWriteOperator<I>
+ extends
+ AbstractWriteOperator<I>
+ implements
+ BoundedOneInput {
+
+ public BulkInsertWriteOperator(Configuration conf, RowType rowType,
MetricOption metricOption) {
+ super(new BulkInsertWriteFunction<>(conf, rowType, metricOption));
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent event) {
+ // no operation
+ }
+
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf,
RowType rowType,
+ MetricOption metricOption) {
+ return WriteOperatorFactory.instance(conf, new
BulkInsertWriteOperator<>(conf, rowType, metricOption));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
new file mode 100644
index 0000000000..ea92e9361e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.utils;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.hudi.sink.StreamWriteOperator;
+import org.apache.inlong.sort.hudi.sink.append.AppendWriteOperator;
+import org.apache.inlong.sort.hudi.sink.bucket.BucketStreamWriteOperator;
+import org.apache.inlong.sort.hudi.sink.bulk.BulkInsertWriteOperator;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
+import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+import org.apache.hudi.sink.compact.CompactOperator;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
+import org.apache.hudi.table.format.FilePathUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities to generate all kinds of sub-pipelines.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class Pipelines {
+
+ /**
+ * Bulk insert the input dataset at once.
+ *
+ * <p>By default, the input dataset would shuffle by the partition path
first then
+ * sort by the partition path before passing around to the write function.
+ * The whole pipeline looks like the following:
+ *
+ * <pre>
+ * | input1 | ===\ /=== |sorter| === | task1 | (p1, p2)
+ * shuffle
+ * | input2 | ===/ \=== |sorter| === | task2 | (p3, p4)
+ *
+ * Note: Both input1 and input2's dataset come from partitions: p1,
p2, p3, p4
+ * </pre>
+ *
+ * <p>The write task switches to new file handle each time it receives a
record
+ * from the different partition path, the shuffle and sort would reduce
small files.
+ *
+ * <p>The bulk insert should be run in batch execution mode.
+ *
+ * @param conf The configuration
+ * @param rowType The input row type
+ * @param dataStream The input data stream
+ * @return the bulk insert data stream sink
+ */
+ public static DataStreamSink<Object> bulkInsert(Configuration conf,
RowType rowType, DataStream<RowData> dataStream,
+ MetricOption metricOption) {
+ WriteOperatorFactory<RowData> operatorFactory =
BulkInsertWriteOperator.getFactory(conf, rowType, metricOption);
+ if (OptionsResolver.isBucketIndexType(conf)) {
+ String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+ int numBuckets =
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+
+ BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(numBuckets, indexKeys);
+ RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+ RowType rowTypeWithFileId =
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+ InternalTypeInfo<RowData> typeInfo =
InternalTypeInfo.of(rowTypeWithFileId);
+
+ Map<String, String> bucketIdToFileId = new HashMap<>();
+ dataStream = dataStream.partitionCustom(partitioner,
keyGen::getHoodieKey)
+ .map(record ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record,
+ indexKeys, numBuckets), typeInfo)
+
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism
as write task to
+
// avoid shuffle
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+ SortOperatorGen sortOperatorGen =
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+ dataStream = dataStream.transform("file_sorter", typeInfo,
sortOperatorGen.createSortOperator())
+
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism
as write task to
+
// avoid shuffle
+
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) *
1024L * 1024L);
+ }
+ return dataStream
+ .transform(opName("bucket_bulk_insert", conf),
TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("bucket_bulk_insert", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(DummySink.INSTANCE)
+ .name("dummy");
+ }
+
+ final String[] partitionFields =
FilePathUtils.extractPartitionKeys(conf);
+ if (partitionFields.length > 0) {
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf,
rowType);
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT))
{
+
+ // shuffle by partition keys
+ // use #partitionCustom instead of #keyBy to avoid duplicate
sort operations,
+ // see BatchExecutionUtils#applyBatchExecutionSettings for
details.
+ Partitioner<String> partitioner =
+ (key, channels) ->
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
+
KeyGroupRangeAssignment.computeDefaultMaxParallelism(
+
conf.getInteger(FlinkOptions.WRITE_TASKS)),
+ channels);
+ dataStream = dataStream.partitionCustom(partitioner,
rowDataKeyGen::getPartitionPath);
+ }
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+ SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
partitionFields);
+ // sort by partition keys
+ dataStream = dataStream
+ .transform("partition_key_sorter",
+ InternalTypeInfo.of(rowType),
+ sortOperatorGen.createSortOperator())
+
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) *
1024L * 1024L);
+ }
+ }
+ return dataStream
+ .transform(opName("hoodie_bulk_insert_write", conf),
+ TypeInformation.of(Object.class),
+ operatorFactory)
+ // follow the parallelism of upstream operators to avoid
shuffle
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(DummySink.INSTANCE)
+ .name("dummy");
+ }
+
+ /**
+ * Insert the dataset with append mode(no upsert or deduplication).
+ *
+ * <p>The input dataset would be rebalanced among the write tasks:
+ *
+ * <pre>
+ * | input1 | ===\ /=== | task1 | (p1, p2, p3, p4)
+ * shuffle
+ * | input2 | ===/ \=== | task2 | (p1, p2, p3, p4)
+ *
+ * Note: Both input1 and input2's dataset come from partitions: p1,
p2, p3, p4
+ * </pre>
+ *
+ * <p>The write task switches to new file handle each time it receives a
record
+ * from the different partition path, so there may be many small files.
+ *
+ * @param conf The configuration
+ * @param rowType The input row type
+ * @param dataStream The input data stream
+ * @param bounded Whether the input stream is bounded
+ * @return the appending data stream sink
+ */
+ public static DataStream<Object> append(
+ Configuration conf,
+ RowType rowType,
+ DataStream<RowData> dataStream,
+ boolean bounded,
+ MetricOption metricOption) {
+ if (!bounded) {
+ // In principle, the config should be immutable, but the
boundedness
+ // is only visible when creating the sink pipeline.
+ conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false);
+ }
+ WriteOperatorFactory<RowData> operatorFactory =
AppendWriteOperator.getFactory(conf, rowType, metricOption);
+
+ return dataStream
+ .transform(opName("hoodie_append_write", conf),
TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("hoodie_stream_write", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ }
+
+ /**
+ * Constructs bootstrap pipeline as streaming.
+ * The bootstrap operator loads the existing data index (primary key to
file id mapping),
+ * then sends the indexing data set to subsequent operator(usually the
bucket assign operator).
+ */
+ public static DataStream<HoodieRecord> bootstrap(
+ Configuration conf,
+ RowType rowType,
+ DataStream<RowData> dataStream) {
+ return bootstrap(conf, rowType, dataStream, false, false);
+ }
+
+ /**
+ * Constructs bootstrap pipeline.
+ * The bootstrap operator loads the existing data index (primary key to
file id mapping),
+ * then send the indexing data set to subsequent operator(usually the
bucket assign operator).
+ *
+ * @param conf The configuration
+ * @param rowType The row type
+ * @param dataStream The data stream
+ * @param bounded Whether the source is bounded
+ * @param overwrite Whether it is insert overwrite
+ */
+ public static DataStream<HoodieRecord> bootstrap(
+ Configuration conf,
+ RowType rowType,
+ DataStream<RowData> dataStream,
+ boolean bounded,
+ boolean overwrite) {
+ final boolean globalIndex =
conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
+ if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
+ return rowDataToHoodieRecord(conf, rowType, dataStream);
+ } else if (bounded && !globalIndex &&
OptionsResolver.isPartitionedTable(conf)) {
+ return boundedBootstrap(conf, rowType, dataStream);
+ } else {
+ return streamBootstrap(conf, rowType, dataStream, bounded);
+ }
+ }
+
+ private static DataStream<HoodieRecord> streamBootstrap(
+ Configuration conf,
+ RowType rowType,
+ DataStream<RowData> dataStream,
+ boolean bounded) {
+ DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf,
rowType, dataStream);
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
+ dataStream1 = dataStream1
+ .transform(
+ "index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new BootstrapOperator<>(conf))
+ .setParallelism(
+
conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
+ .uid(opUID("index_bootstrap", conf));
+ }
+
+ return dataStream1;
+ }
+
+ /**
+ * Constructs bootstrap pipeline for batch execution mode.
+ * The indexing data set is loaded before the actual data write
+ * in order to support batch UPSERT.
+ */
+ private static DataStream<HoodieRecord> boundedBootstrap(
+ Configuration conf,
+ RowType rowType,
+ DataStream<RowData> dataStream) {
+ final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf,
rowType);
+ // shuffle by partition keys
+ dataStream = dataStream
+ .keyBy(rowDataKeyGen::getPartitionPath);
+
+ return rowDataToHoodieRecord(conf, rowType, dataStream)
+ .transform(
+ "batch_index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new BatchBootstrapOperator<>(conf))
+ .setParallelism(
+
conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
+ .uid(opUID("batch_index_bootstrap", conf));
+ }
+
+ /**
+ * Transforms the row data to hoodie records.
+ */
+ public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration
conf, RowType rowType,
+ DataStream<RowData> dataStream) {
+ return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf),
TypeInformation.of(HoodieRecord.class))
+
.setParallelism(dataStream.getParallelism()).name("row_data_to_hoodie_record");
+ }
+
+ /**
+ * The streaming write pipeline.
+ *
+ * <p>The input dataset shuffles by the primary key first then
+ * shuffles by the file group ID before passing around to the write
function.
+ * The whole pipeline looks like the following:
+ *
+ * <pre>
+ * | input1 | ===\ /=== | bucket assigner | ===\ /=== | task1
|
+ * shuffle(by PK) shuffle(by bucket
ID)
+ * | input2 | ===/ \=== | bucket assigner | ===/ \=== | task2
|
+ *
+ * Note: a file group must be handled by one write task to avoid
write conflict.
+ * </pre>
+ *
+ * <p>The bucket assigner assigns the inputs to suitable file groups, the
write task caches
+ * and flushes the data set to disk.
+ *
+ * @param conf The configuration
+ * @param dataStream The input data stream
+ * @return the stream write data stream pipeline
+ */
+ public static DataStream<Object> hoodieStreamWrite(Configuration conf,
DataStream<HoodieRecord> dataStream,
+ MetricOption metricOption) {
+ if (OptionsResolver.isBucketIndexType(conf)) {
+ WriteOperatorFactory<HoodieRecord> operatorFactory =
+ BucketStreamWriteOperator.getFactory(conf, metricOption);
+ int bucketNum =
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ String indexKeyFields =
conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+ BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+ return dataStream.partitionCustom(partitioner,
HoodieRecord::getKey)
+ .transform(opName("bucket_write", conf),
TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("bucket_write", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ } else {
+ WriteOperatorFactory<HoodieRecord> operatorFactory =
StreamWriteOperator.getFactory(conf, metricOption);
+ return dataStream
+ // Key-by record key, to avoid multiple subtasks write to
a bucket at the same time
+ .keyBy(HoodieRecord::getRecordKey)
+ .transform(
+ "bucket_assigner",
+ TypeInformation.of(HoodieRecord.class),
+ new KeyedProcessOperator<>(new
BucketAssignFunction<>(conf)))
+ .uid(opUID("bucket_assigner", conf))
+
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
+ // shuffle by fileId(bucket id)
+ .keyBy(record -> record.getCurrentLocation().getFileId())
+ .transform(opName("stream_write", conf),
TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("stream_write", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ }
+ }
+
+ /**
+ * The compaction tasks pipeline.
+ *
+ * <p>The compaction plan operator monitors the new compaction plan on the
timeline
+ * then distributes the sub-plans to the compaction tasks. The compaction
task then
+ * handle over the metadata to commit task for compaction transaction
commit.
+ * The whole pipeline looks like the following:
+ *
+ * <pre>
+ * /=== | task1 | ===\
+ * | plan generation | ===> hash | commit |
+ * \=== | task2 | ===/
+ *
+ * Note: both the compaction plan generation task and commission task
are singleton.
+ * </pre>
+ *
+ * @param conf The configuration
+ * @param dataStream The input data stream
+ * @return the compaction pipeline
+ */
+ public static DataStreamSink<CompactionCommitEvent> compact(Configuration
conf, DataStream<Object> dataStream) {
+ return dataStream.transform("compact_plan_generate",
+ TypeInformation.of(CompactionPlanEvent.class),
+ new CompactionPlanOperator(conf))
+ .setParallelism(1) // plan generate must be singleton
+ // make the distribution strategy deterministic to avoid
concurrent modifications
+ // on the same bucket files
+ .keyBy(plan ->
plan.getOperation().getFileGroupId().getFileId())
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new CompactOperator(conf))
+ .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+ .addSink(new CompactionCommitSink(conf))
+ .name("compact_commit")
+ .setParallelism(1); // compaction commit should be singleton
+ }
+
+ /**
+ * The clustering tasks pipeline.
+ *
+ * <p>The clustering plan operator monitors the new clustering plan on the
timeline
+ * then distributes the sub-plans to the clustering tasks. The clustering
task then
+ * handle over the metadata to commit task for clustering transaction
commit.
+ * The whole pipeline looks like the following:
+ *
+ * <pre>
+ * /=== | task1 | ===\
+ * | plan generation | ===> hash | commit |
+ * \=== | task2 | ===/
+ *
+ * Note: both the clustering plan generation task and commission task
are singleton.
+ * </pre>
+ *
+ * @param conf The configuration
+ * @param rowType The input row type
+ * @param dataStream The input data stream
+ * @return the clustering pipeline
+ */
+ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration
conf, RowType rowType,
+ DataStream<Object> dataStream) {
+ DataStream<ClusteringCommitEvent> clusteringStream =
dataStream.transform("cluster_plan_generate",
+ TypeInformation.of(ClusteringPlanEvent.class),
+ new ClusteringPlanOperator(conf))
+ .setParallelism(1) // plan generate must be singleton
+ .keyBy(plan ->
+ // make the distribution strategy deterministic to avoid
concurrent modifications
+ // on the same bucket files
+ plan.getClusteringGroupInfo().getOperations()
+
.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
+ if (OptionsResolver.sortClusteringEnabled(conf)) {
+
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L *
1024L);
+ }
+ return clusteringStream.addSink(new ClusteringCommitSink(conf))
+ .name("clustering_commit")
+ .setParallelism(1); // compaction commit should be singleton
+ }
+
+ public static DataStreamSink<Object> clean(Configuration conf,
DataStream<Object> dataStream) {
+ return dataStream.addSink(new CleanFunction<>(conf))
+ .setParallelism(1)
+ .name("clean_commits");
+ }
+
+ public static DataStreamSink<Object> dummySink(DataStream<Object>
dataStream) {
+ return dataStream.addSink(DummySink.INSTANCE)
+ .setParallelism(1)
+ .name("dummy");
+ }
+
+ public static String opName(String operatorN, Configuration conf) {
+ return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+ }
+
+ public static String opUID(String operatorN, Configuration conf) {
+ return "uid_" + operatorN + "_" +
conf.getString(FlinkOptions.TABLE_NAME);
+ }
+
+ /**
+ * Dummy sink that does nothing.
+ */
+ public static class DummySink implements SinkFunction<Object> {
+
+ private static final long serialVersionUID = 1L;
+ public static DummySink INSTANCE = new DummySink();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
new file mode 100644
index 0000000000..ac153ef729
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Hoodie data source/sink factory.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class HoodieTableFactory implements DynamicTableSinkFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieTableFactory.class);
+
+ public static final String FACTORY_ID = "hudi-inlong";
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
+
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
+ "Option [path] should not be empty.");
+ ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+ sanityCheck(conf, schema);
+ setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
+
+ String inlongMetric = conf.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = conf.get(INLONG_AUDIT);
+ String auditKeys = conf.get(AUDIT_KEYS);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+ return new HoodieTableSink(conf, schema, metricOption);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.singleton(FlinkOptions.PATH);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> configOptions = FlinkOptions.optionalOptions();
+ configOptions.add(AUDIT_KEYS);
+ configOptions.add(INLONG_METRIC);
+ configOptions.add(INLONG_AUDIT);
+ return configOptions;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Utilities
+ //
-------------------------------------------------------------------------
+
+ /**
+ * The sanity check.
+ *
+ * @param conf The table options
+ * @param schema The table schema
+ */
+ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+ List<String> fields = schema.getColumnNames();
+
+ // validate record key in pk absence.
+ if (!schema.getPrimaryKey().isPresent()) {
+ String[] recordKeys =
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ if (recordKeys.length == 1
+ &&
FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0])
+ && !fields.contains(recordKeys[0])) {
+ throw new HoodieValidationException("Primary key definition is
required, use either PRIMARY KEY syntax "
+ + "or option '" + FlinkOptions.RECORD_KEY_FIELD.key()
+ "' to specify.");
+ }
+
+ Arrays.stream(recordKeys)
+ .filter(field -> !fields.contains(field))
+ .findAny()
+ .ifPresent(f -> {
+ throw new HoodieValidationException("Field '" + f + "'
specified in option "
+ + "'" + FlinkOptions.RECORD_KEY_FIELD.key() +
"' does not exist in the table schema.");
+ });
+ }
+
+ // validate pre_combine key
+ String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
+ if (!fields.contains(preCombineField)) {
+ if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+ throw new HoodieValidationException("Option '" +
FlinkOptions.PRECOMBINE_FIELD.key()
+ + "' is required for payload class: " +
DefaultHoodieRecordPayload.class.getName());
+ }
+ if
(preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
+ conf.setString(FlinkOptions.PRECOMBINE_FIELD,
FlinkOptions.NO_PRE_COMBINE);
+ } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) {
+ throw new HoodieValidationException("Field " + preCombineField
+ " does not exist in the table schema."
+ + "Please check '" +
FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
+ }
+ }
+ }
+
+ /**
+ * Sets up the config options based on the table definition, for e.g, the
table name, primary key.
+ *
+ * @param conf The configuration to set up
+ * @param tablePath The table path
+ * @param table The catalog table
+ * @param schema The physical schema
+ */
+ private static void setupConfOptions(
+ Configuration conf,
+ ObjectIdentifier tablePath,
+ CatalogTable table,
+ ResolvedSchema schema) {
+ // table name
+ conf.setString(FlinkOptions.TABLE_NAME.key(),
tablePath.getObjectName());
+ // hoodie key about options
+ setupHoodieKeyOptions(conf, table);
+ // compaction options
+ setupCompactionOptions(conf);
+ // hive options
+ setupHiveOptions(conf, tablePath);
+ // read options
+ setupReadOptions(conf);
+ // write options
+ setupWriteOptions(conf);
+ // infer avro schema from physical DDL schema
+ inferAvroSchema(conf,
schema.toPhysicalRowDataType().notNull().getLogicalType());
+ }
+
+ /**
+ * Sets up the hoodie key options (e.g. record key and partition key) from
the table definition.
+ */
+ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable
table) {
+ List<String> pkColumns = table.getSchema().getPrimaryKey()
+
.map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
+ if (pkColumns.size() > 0) {
+ // the PRIMARY KEY syntax always has higher priority than option
FlinkOptions#RECORD_KEY_FIELD
+ String recordKey = String.join(",", pkColumns);
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+ }
+ List<String> partitionKeys = table.getPartitionKeys();
+ if (partitionKeys.size() > 0) {
+ // the PARTITIONED BY syntax always has higher priority than
option FlinkOptions#PARTITION_PATH_FIELD
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitionKeys));
+ }
+ // set index key for bucket index if not defined
+ if
(conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()))
{
+ if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
+ conf.setString(FlinkOptions.INDEX_KEY_FIELD,
conf.getString(FlinkOptions.RECORD_KEY_FIELD));
+ } else {
+ Set<String> recordKeySet =
+
Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","))
+ .collect(Collectors.toSet());
+ Set<String> indexKeySet =
+
Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(","))
+ .collect(Collectors.toSet());
+ if (!recordKeySet.containsAll(indexKeySet)) {
+ throw new HoodieValidationException(
+ FlinkOptions.INDEX_KEY_FIELD + " should be a
subset of or equal to the recordKey fields");
+ }
+ }
+ }
+
+ // tweak the key gen class if possible
+ final String[] partitions =
conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+ final String[] pks =
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ if (partitions.length == 1) {
+ final String partitionField = partitions[0];
+ if (partitionField.isEmpty()) {
+ conf.setString(FlinkOptions.KEYGEN_CLASS_NAME,
NonpartitionedAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because this is a
non-partitioned table",
+ FlinkOptions.KEYGEN_CLASS_NAME.key(),
NonpartitionedAvroKeyGenerator.class.getName());
+ return;
+ }
+ DataType partitionFieldType =
table.getSchema().getFieldDataType(partitionField)
+ .orElseThrow(() -> new HoodieValidationException("Field "
+ partitionField + " does not exist"));
+ if (pks.length <= 1 &&
DataTypeUtils.isDatetimeType(partitionFieldType)) {
+ // timestamp based key gen only supports simple primary key
+ setupTimestampKeygenOptions(conf, partitionFieldType);
+ return;
+ }
+ }
+ boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
+ if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.KEYGEN_CLASS_NAME)) {
+ conf.setString(FlinkOptions.KEYGEN_CLASS_NAME,
ComplexAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because record key or
partition path has two or more fields",
+ FlinkOptions.KEYGEN_CLASS_NAME.key(),
ComplexAvroKeyGenerator.class.getName());
+ }
+ }
+
+ /**
+ * Sets up the keygen options when the partition path is datetime type.
+ *
+ * <p>The UTC timezone is used as default.
+ */
+ public static void setupTimestampKeygenOptions(Configuration conf,
DataType fieldType) {
+ if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
+ // the keygen clazz has been set up explicitly, skipping
+ return;
+ }
+
+ conf.setString(FlinkOptions.KEYGEN_CLASS_NAME,
TimestampBasedAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because datetime
partitioning turns on",
+ FlinkOptions.KEYGEN_CLASS_NAME.key(),
TimestampBasedAvroKeyGenerator.class.getName());
+ if (DataTypeUtils.isTimestampType(fieldType)) {
+ int precision =
DataTypeUtils.precision(fieldType.getLogicalType());
+ if (precision == 0) {
+ // seconds
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+
TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name());
+ } else if (precision == 3) {
+ // milliseconds
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
+ }
+ String outputPartitionFormat =
+
conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP,
outputPartitionFormat);
+ } else {
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+
TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
+ conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT,
TimeUnit.DAYS.toString());
+
+ String outputPartitionFormat =
+
conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP,
outputPartitionFormat);
+ // the option is actually useless, it only works for validation
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP,
+ FlinkOptions.PARTITION_FORMAT_DAY);
+ }
+
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP,
"UTC");
+ }
+
+ /**
+ * Sets up the compaction options from the table definition.
+ */
+ private static void setupCompactionOptions(Configuration conf) {
+ int commitsToRetain =
conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
+ int minCommitsToKeep =
conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS);
+ if (commitsToRetain >= minCommitsToKeep) {
+ LOG.info("Table option [{}] is reset to {} to be greater than
{}={},\n"
+ + "to avoid risk of missing data from few instants in
incremental pull",
+ FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain +
10,
+ FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain);
+ conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain
+ 10);
+ conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain
+ 20);
+ }
+ }
+
+ /**
+ * Sets up the hive options from the table definition.
+ */
+ private static void setupHiveOptions(Configuration conf, ObjectIdentifier
tablePath) {
+ if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
+ conf.setString(FlinkOptions.HIVE_SYNC_DB,
tablePath.getDatabaseName());
+ }
+ if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
+ conf.setString(FlinkOptions.HIVE_SYNC_TABLE,
tablePath.getObjectName());
+ }
+ }
+
+ /**
+ * Sets up the read options from the table definition.
+ */
+ private static void setupReadOptions(Configuration conf) {
+ if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+ &&
(conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
+ ||
conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+ conf.setString(FlinkOptions.QUERY_TYPE,
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ }
+ }
+
+ /**
+ * Sets up the write options from the table definition.
+ */
+ private static void setupWriteOptions(Configuration conf) {
+ if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
+ && OptionsResolver.isCowTable(conf)) {
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+ }
+ }
+
+ /**
+ * Inferences the deserialization Avro schema from the table schema (e.g.
the DDL)
+ * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
+ * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
+ *
+ * @param conf The configuration
+ * @param rowType The specified table row type
+ */
+ private static void inferAvroSchema(Configuration conf, LogicalType
rowType) {
+ if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
+ &&
!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
+ String inferredSchema =
AvroSchemaConverter.convertToSchema(rowType).toString();
+ conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
new file mode 100644
index 0000000000..173befd988
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.hudi.sink.utils.Pipelines;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.util.ChangelogModes;
+
+import java.util.Map;
+
+/**
+ * Hoodie table sink.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
+
+ private final Configuration conf;
+ private final ResolvedSchema schema;
+ private boolean overwrite = false;
+ private final MetricOption metricOption;
+
+ public HoodieTableSink(Configuration conf, ResolvedSchema schema,
MetricOption metricOption) {
+ this.conf = conf;
+ this.schema = schema;
+ this.metricOption = metricOption;
+
+ }
+
+ public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean
overwrite, MetricOption metricOption) {
+ this.conf = conf;
+ this.schema = schema;
+ this.overwrite = overwrite;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ return (DataStreamSinkProviderAdapter) dataStream -> {
+
+ // setup configuration
+ long ckpTimeout = dataStream.getExecutionEnvironment()
+ .getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ // set up default parallelism
+ OptionsInference.setupSinkTasks(conf,
dataStream.getExecutionConfig().getParallelism());
+
+ RowType rowType = (RowType)
schema.toSinkRowDataType().notNull().getLogicalType();
+
+ // bulk_insert mode
+ final String writeOperation =
this.conf.get(FlinkOptions.OPERATION);
+ if (WriteOperationType.fromValue(writeOperation) ==
WriteOperationType.BULK_INSERT) {
+ return Pipelines.bulkInsert(conf, rowType, dataStream,
metricOption);
+ }
+
+ // Append mode
+ if (OptionsResolver.isAppendMode(conf)) {
+ DataStream<Object> pipeline =
+ Pipelines.append(conf, rowType, dataStream,
context.isBounded(), metricOption);
+ if (OptionsResolver.needsAsyncClustering(conf)) {
+ return Pipelines.cluster(conf, rowType, pipeline);
+ } else {
+ return Pipelines.dummySink(pipeline);
+ }
+ }
+
+ DataStream<Object> pipeline;
+ // bootstrap
+ final DataStream<HoodieRecord> hoodieRecordDataStream =
+ Pipelines.bootstrap(conf, rowType, dataStream,
context.isBounded(), overwrite);
+ // write pipeline
+ pipeline = Pipelines.hoodieStreamWrite(conf,
hoodieRecordDataStream, metricOption);
+ // compaction
+ if (OptionsResolver.needsAsyncCompaction(conf)) {
+ // use synchronous compaction for bounded source.
+ if (context.isBounded()) {
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED,
false);
+ }
+ return Pipelines.compact(conf, pipeline);
+ } else {
+ return Pipelines.clean(conf, pipeline);
+ }
+ };
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+ if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+ return ChangelogModes.FULL;
+ } else {
+ return ChangelogModes.UPSERT;
+ }
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new HoodieTableSink(this.conf, this.schema, this.overwrite,
this.metricOption);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "HoodieTableSink";
+ }
+
+ @Override
+ public void applyStaticPartition(Map<String, String> partitions) {
+ // #applyOverwrite should have been invoked.
+ if (this.overwrite && partitions.size() > 0) {
+ this.conf.setString(FlinkOptions.OPERATION,
WriteOperationType.INSERT_OVERWRITE.value());
+ }
+ }
+
+ @Override
+ public void applyOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ // set up the operation as INSERT_OVERWRITE_TABLE first,
+ // if there are explicit partitions, #applyStaticPartition would
overwrite the option.
+ this.conf.setString(FlinkOptions.OPERATION,
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
deleted file mode 100644
index d2985ac5d4..0000000000
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.hudi.table.sink;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.hudi.table.HoodieTableFactory;
-
-import java.util.Set;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-
-public class HudiTableInlongFactory extends HoodieTableFactory {
-
- public static final String SORT_CONNECTOR_IDENTIFY_HUDI = "hudi-inlong";
-
- public HudiTableInlongFactory() {
- super();
- }
-
- @Override
- public String factoryIdentifier() {
- return SORT_CONNECTOR_IDENTIFY_HUDI;
- }
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- return super.createDynamicTableSink(context);
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> configOptions = super.optionalOptions();
- configOptions.add(INLONG_METRIC);
- configOptions.add(INLONG_AUDIT);
- return configOptions;
- }
-
-}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 709a02e60c..d8a0e68a77 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.hudi.table.sink.HudiTableInlongFactory
+org.apache.inlong.sort.hudi.table.HoodieTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index e3a995cc65..7be72aace2 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -880,6 +880,19 @@ License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the
software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+1.3.26
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HudiTableInlongFactory.java
+Source : org.apache.hudi:hudi-flink1.15-bundle:0.12.3 (Please note that the
software have been modified.)
+License : https://github.com/apache/hudi/blob/master/LICENSE
+
=======================================================================