This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1879efa45d [HUDI-4686] Flip option 'write.ignore.failed' to default
false (#6467)
1879efa45d is described below
commit 1879efa45d556c87d2cda56fa16d5de535481c06
Author: Danny Chan <[email protected]>
AuthorDate: Tue Aug 23 18:54:57 2022 +0800
[HUDI-4686] Flip option 'write.ignore.failed' to default false (#6467)
Also fix the flaky test
---
.../apache/hudi/configuration/FlinkOptions.java | 6 +--
.../apache/hudi/sink/ITTestDataStreamWrite.java | 50 +++++++---------------
2 files changed, 18 insertions(+), 38 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 65c27084bf..5c8a89380d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -99,7 +99,7 @@ public class FlinkOptions extends HoodieConfig {
public static final String NO_PRE_COMBINE = "no_precombine";
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
- .key("payload.ordering.field")
+ .key("precombine.field")
.stringType()
.defaultValue("ts")
.withFallbackKeys("write.precombine.field")
@@ -330,9 +330,9 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
.key("write.ignore.failed")
.booleanType()
- .defaultValue(true)
+ .defaultValue(false)
.withDescription("Flag to indicate whether to ignore any non exception
error (e.g. writestatus error). within a checkpoint batch.\n"
- + "By default true (in favor of streaming progressing over data
integrity)");
+ + "By default false");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 4862cda07a..18b2af3efe 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -157,12 +157,11 @@ public class ITTestDataStreamWrite extends TestLogger {
}
@Test
- public void testWriteMergeOnReadWithClustering() throws Exception {
+ public void testWriteCopyOnWriteWithClustering() throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
- conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.COPY_ON_WRITE.name());
testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}
@@ -281,36 +280,20 @@ public class ITTestDataStreamWrite extends TestLogger {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
- boolean isMor =
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
-
- DataStream<RowData> dataStream;
- if (isMor) {
- TextInputFormat format = new TextInputFormat(new Path(sourcePath));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
- format.setCharsetName("UTF-8");
-
- dataStream = execEnv
- // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
- .readFile(format, sourcePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
- .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
- .setParallelism(1);
- } else {
- dataStream = execEnv
- // use continuous file source to trigger checkpoint
- .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
- .name("continuous_file_source")
- .setParallelism(1)
- .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
- .setParallelism(4);
- }
+ DataStream<RowData> dataStream = execEnv
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+ .setParallelism(4);
OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream,
true);
execEnv.addOperator(pipeline.getTransformation());
Pipelines.cluster(conf, rowType, pipeline);
- execEnv.execute(jobName);
+ execute(execEnv, false, jobName);
TestData.checkWrittenDataCOW(tempFile, expected);
}
@@ -384,8 +367,6 @@ public class ITTestDataStreamWrite extends TestLogger {
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
- options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
- options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(),
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
@@ -405,16 +386,15 @@ public class ITTestDataStreamWrite extends TestLogger {
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
DataStream dataStream = execEnv
- // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
- .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY,
1000, typeInfo)
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
.map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
- .setParallelism(1);
-
-
+ .setParallelism(4);
//sink to hoodie table use low-level sink api.
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
@@ -429,7 +409,7 @@ public class ITTestDataStreamWrite extends TestLogger {
builder.sink(dataStream, false);
- execute(execEnv, true, "Api_Sink_Test");
+ execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
}