This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1879efa45d [HUDI-4686] Flip option 'write.ignore.failed' to default 
false (#6467)
1879efa45d is described below

commit 1879efa45d556c87d2cda56fa16d5de535481c06
Author: Danny Chan <[email protected]>
AuthorDate: Tue Aug 23 18:54:57 2022 +0800

    [HUDI-4686] Flip option 'write.ignore.failed' to default false (#6467)
    
    Also fix the flaky test
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 +--
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 50 +++++++---------------
 2 files changed, 18 insertions(+), 38 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 65c27084bf..5c8a89380d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -99,7 +99,7 @@ public class FlinkOptions extends HoodieConfig {
 
   public static final String NO_PRE_COMBINE = "no_precombine";
   public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
-      .key("payload.ordering.field")
+      .key("precombine.field")
       .stringType()
       .defaultValue("ts")
       .withFallbackKeys("write.precombine.field")
@@ -330,9 +330,9 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
       .key("write.ignore.failed")
       .booleanType()
-      .defaultValue(true)
+      .defaultValue(false)
       .withDescription("Flag to indicate whether to ignore any non exception 
error (e.g. writestatus error). within a checkpoint batch.\n"
-          + "By default true (in favor of streaming progressing over data 
integrity)");
+          + "By default false");
 
   public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 4862cda07a..18b2af3efe 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -157,12 +157,11 @@ public class ITTestDataStreamWrite extends TestLogger {
   }
 
   @Test
-  public void testWriteMergeOnReadWithClustering() throws Exception {
+  public void testWriteCopyOnWriteWithClustering() throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.OPERATION, "insert");
-    conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.COPY_ON_WRITE.name());
 
     testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
   }
@@ -281,36 +280,20 @@ public class ITTestDataStreamWrite extends TestLogger {
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
         .getContextClassLoader().getResource("test_source.data")).toString();
 
-    boolean isMor = 
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
-
-    DataStream<RowData> dataStream;
-    if (isMor) {
-      TextInputFormat format = new TextInputFormat(new Path(sourcePath));
-      format.setFilesFilter(FilePathFilter.createDefaultFilter());
-      TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
-      format.setCharsetName("UTF-8");
-
-      dataStream = execEnv
-          // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
-          .readFile(format, sourcePath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
-          .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
-          .setParallelism(1);
-    } else {
-      dataStream = execEnv
-          // use continuous file source to trigger checkpoint
-          .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
-          .name("continuous_file_source")
-          .setParallelism(1)
-          .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
-          .setParallelism(4);
-    }
+    DataStream<RowData> dataStream = execEnv
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4);
 
     OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
     DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, 
true);
     execEnv.addOperator(pipeline.getTransformation());
 
     Pipelines.cluster(conf, rowType, pipeline);
-    execEnv.execute(jobName);
+    execute(execEnv, false, jobName);
 
     TestData.checkWrittenDataCOW(tempFile, expected);
   }
@@ -384,8 +367,6 @@ public class ITTestDataStreamWrite extends TestLogger {
     execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
-    options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
-    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
     options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), 
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
     Configuration conf = Configuration.fromMap(options);
     // Read from file source
@@ -405,16 +386,15 @@ public class ITTestDataStreamWrite extends TestLogger {
 
     TextInputFormat format = new TextInputFormat(new Path(sourcePath));
     format.setFilesFilter(FilePathFilter.createDefaultFilter());
-    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
     format.setCharsetName("UTF-8");
 
     DataStream dataStream = execEnv
-        // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
-        .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 
1000, typeInfo)
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), 2))
+        .name("continuous_file_source")
+        .setParallelism(1)
         .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
-        .setParallelism(1);
-
-
+        .setParallelism(4);
 
     //sink to hoodie table use low-level sink api.
     HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
@@ -429,7 +409,7 @@ public class ITTestDataStreamWrite extends TestLogger {
 
     builder.sink(dataStream, false);
 
-    execute(execEnv, true, "Api_Sink_Test");
+    execute(execEnv, false, "Api_Sink_Test");
     TestData.checkWrittenDataCOW(tempFile, EXPECTED);
   }
 }

Reply via email to