This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9f2087a8944 [HUDI-6615] Fix the condition of isInputSorted in
BulkInsertWriterHelper (#9314)
9f2087a8944 is described below
commit 9f2087a89443e93079d061fd81bf2f768f9c6953
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Aug 3 08:50:31 2023 +0800
[HUDI-6615] Fix the condition of isInputSorted in BulkInsertWriterHelper
(#9314)
---
.../apache/hudi/configuration/OptionsResolver.java | 8 ++++++++
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 3 ++-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 11 ++---------
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 2 +-
.../org/apache/hudi/table/HoodieTableSink.java | 5 ++---
.../apache/hudi/sink/ITTestDataStreamWrite.java | 2 +-
.../hudi/sink/bucket/ITTestBucketStreamWrite.java | 23 +---------------------
.../bucket/ITTestConsistentBucketStreamWrite.java | 5 ++---
8 files changed, 19 insertions(+), 40 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 8f4b013de04..944e795dc2f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -76,6 +76,14 @@ public class OptionsResolver {
return operationType == WriteOperationType.INSERT;
}
+ /**
+ * Returns whether the table operation is 'bulk_insert'.
+ */
+ public static boolean isBulkInsertOperation(Configuration conf) {
+ WriteOperationType operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ return operationType == WriteOperationType.BULK_INSERT;
+ }
+
/**
* Returns whether it is a MERGE_ON_READ table.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 56f668e32f0..3c0d4fb7662 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.table.HoodieTable;
@@ -84,7 +85,7 @@ public class BulkInsertWriterHelper {
this.taskEpochId = taskEpochId;
this.rowType = preserveHoodieMetadata ? rowType :
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch
up with metadata fields
this.preserveHoodieMetadata = preserveHoodieMetadata;
- this.isInputSorted =
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
+ this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGen.instance(conf,
rowType);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5d945d07aa1..fe51fe435e1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -202,19 +202,12 @@ public class Pipelines {
* @param conf The configuration
* @param rowType The input row type
* @param dataStream The input data stream
- * @param bounded Whether the input stream is bounded
* @return the appending data stream sink
*/
public static DataStream<Object> append(
Configuration conf,
RowType rowType,
- DataStream<RowData> dataStream,
- boolean bounded) {
- if (!bounded) {
- // In principle, the config should be immutable, but the boundedness
- // is only visible when creating the sink pipeline.
- conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false);
- }
+ DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory =
AppendWriteOperator.getFactory(conf, rowType);
return dataStream
@@ -469,7 +462,7 @@ public class Pipelines {
}
return clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
- .setParallelism(1); // compaction commit should be singleton
+ .setParallelism(1); // clustering commit should be singleton
}
public static DataStreamSink<Object> clean(Configuration conf,
DataStream<Object> dataStream) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b45f9ca3879..62d22869f64 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -103,7 +103,7 @@ public class HoodieFlinkStreamer {
DataStream<Object> pipeline;
// Append mode
if (OptionsResolver.isAppendMode(conf)) {
- pipeline = Pipelines.append(conf, rowType, dataStream, false);
+ pipeline = Pipelines.append(conf, rowType, dataStream);
if (OptionsResolver.needsAsyncClustering(conf)) {
Pipelines.cluster(conf, rowType, pipeline);
} else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 0096fb3476f..ec0db6b1262 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -85,14 +85,13 @@ public class HoodieTableSink implements
RowType rowType = (RowType)
schema.toSinkRowDataType().notNull().getLogicalType();
// bulk_insert mode
- final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
- if (WriteOperationType.fromValue(writeOperation) ==
WriteOperationType.BULK_INSERT) {
+ if (OptionsResolver.isBulkInsertOperation(conf)) {
return Pipelines.bulkInsert(conf, rowType, dataStream);
}
// Append mode
if (OptionsResolver.isAppendMode(conf)) {
- DataStream<Object> pipeline = Pipelines.append(conf, rowType,
dataStream, context.isBounded());
+ DataStream<Object> pipeline = Pipelines.append(conf, rowType,
dataStream);
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
} else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 2aec8e5d5fc..954ca6593c3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -314,7 +314,7 @@ public class ITTestDataStreamWrite extends TestLogger {
.setParallelism(4);
OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
- DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream,
true);
+ DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);
execEnv.addOperator(pipeline.getTransformation());
Pipelines.cluster(conf, rowType, pipeline);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index ecd31cd719e..3d6d0918ef0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -27,15 +27,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
-import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -95,26 +92,8 @@ public class ITTestBucketStreamWrite {
}
private static void doDeleteCommit(String tablePath, boolean isCow) throws
Exception {
- // make configuration and setAvroSchema
- FlinkClusteringConfig cfg = new FlinkClusteringConfig();
- cfg.path = tablePath;
- Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
-
// create metaClient
- HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-
- conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableType().name());
-
- // set the table name
- conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
-
- // set record key field
- conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
- // set partition field
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
-
- // set table schema
- CompactionUtil.setAvroSchema(conf, metaClient);
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tablePath, new
org.apache.hadoop.conf.Configuration());
// should only contain one instant
HoodieTimeline activeCompletedTimeline =
metaClient.getActiveTimeline().filterCompletedInstants();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index 4882552f0a7..5309b2225fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -21,11 +21,11 @@ package org.apache.hudi.sink.bucket;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -187,8 +187,7 @@ public class ITTestConsistentBucketStreamWrite extends
TestLogger {
OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream);
// bulk_insert mode
- final String writeOperation = conf.get(FlinkOptions.OPERATION);
- if (WriteOperationType.fromValue(writeOperation) ==
WriteOperationType.BULK_INSERT) {
+ if (OptionsResolver.isBulkInsertOperation(conf)) {
Pipelines.bulkInsert(conf, rowType, dataStream);
} else {
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
hoodieRecordDataStream);