This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc18c39  [FLINK-1923] Exactly-once write for flink writer (#3002)
bc18c39 is described below

commit bc18c39835d6775b063ae072aea4ba43177d66b1
Author: yuzhaojing <[email protected]>
AuthorDate: Fri May 28 14:58:21 2021 +0800

    [FLINK-1923] Exactly-once write for flink writer (#3002)
    
    Co-authored-by: 喻兆靖 <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    | 16 ++++++
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 58 +++++++++++++++++++++-
 .../sink/partitioner/BucketAssignFunction.java     |  3 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |  3 ++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  9 ----
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 51 +++++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  4 ++
 7 files changed, 133 insertions(+), 11 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 8526392..4c5d309 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -305,6 +305,22 @@ public class FlinkOptions {
       .defaultValue(100) // default 100 MB
       .withDescription("Max memory in MB for merge, default 100MB");
 
+  public static final ConfigOption<Boolean> WRITE_EXACTLY_ONCE_ENABLED = 
ConfigOptions
+      .key("write.exactly_once.enabled")
+      .booleanType()
+      .defaultValue(false) // default at least once
+      .withDescription("Whether write in exactly_once semantics, if true,\n"
+          + "the write task would block flushing after it finishes a 
checkpoint\n"
+          + "until it receives the checkpoint success event, default false");
+
+  // this is only for internal use
+  public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = 
ConfigOptions
+      .key("write.commit.ack.timeout")
+      .longType()
+      .defaultValue(-1L) // default at least once
+      .withDescription("Timeout limit for a writer task after it finishes a 
checkpoint and\n"
+          + "waits for the instant commit success, only for internal use");
+
   // ------------------------------------------------------------------------
   //  Compaction Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index a1b3346..7679f2c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -54,7 +55,9 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -146,6 +149,27 @@ public class StreamWriteFunction<K, I, O>
   private transient TotalSizeTracer tracer;
 
   /**
+   * Whether write in exactly-once semantics.
+   */
+  private boolean exactlyOnce;
+
+  /**
+   * Flag saying whether the write task is waiting for the checkpoint success 
notification
+   * after it finished a checkpoint.
+   *
+   * <p>The flag is needed because the write task does not block during the 
waiting time interval,
+   * some data buckets still flush out with old instant time. There are two 
cases that the flush may produce
+   * corrupted files if the old instant is committed successfully:
+   * 1) the write handle was writing data but interrupted, left a corrupted 
parquet file;
+   * 2) the write handle finished the write but was not closed, left an empty 
parquet file.
+   *
+   * <p>To solve, when this flag was set to true, we block the data flushing 
thus the #processElement method,
+   * the flag was reset to false if the task receives the checkpoint success 
event or the latest inflight instant
+   * time changed(the last instant committed successfully).
+   */
+  private volatile boolean confirming = false;
+
+  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
@@ -162,6 +186,7 @@ public class StreamWriteFunction<K, I, O>
         WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
         HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
     this.tracer = new TotalSizeTracer(this.config);
+    this.exactlyOnce = 
config.getBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED);
     initBuffer();
     initWriteFunction();
   }
@@ -225,6 +250,11 @@ public class StreamWriteFunction<K, I, O>
     return writeClient;
   }
 
+  @VisibleForTesting
+  public boolean isConfirming() {
+    return this.confirming;
+  }
+
   public void setOperatorEventGateway(OperatorEventGateway 
operatorEventGateway) {
     this.eventGateway = operatorEventGateway;
   }
@@ -458,7 +488,7 @@ public class StreamWriteFunction<K, I, O>
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushBucket(DataBucket bucket) {
-    final String instant = 
this.writeClient.getLastPendingInstant(this.actionType);
+    String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
@@ -466,6 +496,31 @@ public class StreamWriteFunction<K, I, O>
       return;
     }
 
+    // if exactly-once semantics turns on,
+    // waits for the checkpoint notification until the checkpoint timeout 
threshold hits.
+    if (exactlyOnce && confirming) {
+      long waitingTime = 0L;
+      long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
+      long interval = 500L;
+      while (Objects.equals(instant, this.currentInstant)) {
+        // sleep for a while
+        try {
+          if (waitingTime > ckpTimeout) {
+            throw new HoodieException("Timeout(" + waitingTime + "ms) while 
waiting for instant " + instant + " to commit");
+          }
+          TimeUnit.MILLISECONDS.sleep(interval);
+          waitingTime += interval;
+        } catch (InterruptedException e) {
+          throw new HoodieException("Error while waiting for instant " + 
instant + " to commit", e);
+        }
+        // refresh the inflight instant
+        instant = this.writeClient.getLastPendingInstant(this.actionType);
+      }
+      // the inflight instant changed, which means the last instant was 
committed
+      // successfully.
+      confirming = false;
+    }
+
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
     if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
@@ -522,5 +577,6 @@ public class StreamWriteFunction<K, I, O>
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
     this.tracer.reset();
+    this.confirming = true;
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 4c51275..54a4a62 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -58,6 +58,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * The function to build the write profile incrementally for records within a 
checkpoint,
@@ -197,7 +198,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     if (isChangingRecords && indexState.contains(recordKey)) {
       // Set up the instant time as "U" to mark the bucket as an update bucket.
       HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
-      if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) {
+      if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
         if (globalIndex) {
           // if partition path changes, emit a delete record for old partition 
path,
           // then update the index state using location with new partition 
path.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index c3427f3..b4109b3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -68,6 +68,9 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       // Read from kafka source
       RowType rowType = (RowType) 
schema.toRowDataType().notNull().getLogicalType();
       int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
+      long ckpTimeout = dataStream.getExecutionEnvironment()
+          .getCheckpointConfig().getCheckpointTimeout();
+      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
       StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new 
StreamWriteOperatorFactory<>(conf);
 
       DataStream<Object> pipeline = dataStream
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 6f69101..c33d635 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -58,8 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.StringReader;
@@ -336,11 +334,4 @@ public class StreamerUtil {
     long oldTime = Long.parseLong(oldInstant);
     return String.valueOf(oldTime + milliseconds);
   }
-
-  /**
-   * Copied from Objects#equal.
-   */
-  public static boolean equal(@Nullable Object a, @Nullable Object b) {
-    return a == b || (a != null && a.equals(b));
-  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d2d04ee..247e17e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.utils.TestConfigurations;
@@ -52,6 +53,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -635,6 +637,55 @@ public class TestWriteCopyOnWrite {
     checkWrittenData(tempFile, EXPECTED2);
   }
 
+  @Test
+  public void testWriteExactlyOnce() throws Exception {
+    // reset the config option
+    conf.setBoolean(FlinkOptions.WRITE_EXACTLY_ONCE_ENABLED, true);
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes 
buffer size
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // open the function and ingest data
+
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    // no checkpoint, so the coordinator does not accept any events
+    assertTrue(
+        funcWrapper.getEventBuffer().length == 1
+            && funcWrapper.getEventBuffer()[0] == null, "The coordinator 
events buffer expect to be empty");
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+    assertTrue(funcWrapper.isConforming(), "The write function should be 
waiting for the instant to commit");
+
+    for (int i = 0; i < 2; i++) {
+      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the 
first event first
+      assertThat("The operator expect to send an event", event, 
instanceOf(BatchWriteSuccessEvent.class));
+      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
+    }
+
+    funcWrapper.checkpointComplete(1);
+
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertFalse(funcWrapper.isConforming(), "The write function should finish 
waiting for the instant to commit");
+
+    // checkpoint for the next round, when there is eager flush but the write
+    // task is waiting for the instant commit ack, should throw for timeout.
+    funcWrapper.checkpointFunction(2);
+
+    assertThrows(HoodieException.class, () -> {
+      for (RowData rowData : TestData.DATA_SET_INSERT) {
+        funcWrapper.invoke(rowData);
+      }
+    }, "Timeout(500ms) while waiting for instant");
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a4b6c16..9f8852f 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -197,4 +197,8 @@ public class StreamWriteFunctionWrapper<I> {
   public boolean isKeyInState(HoodieKey hoodieKey) {
     return this.bucketAssignerFunction.isKeyInState(hoodieKey);
   }
+
+  public boolean isConforming() {
+    return this.writeFunction.isConfirming();
+  }
 }

Reply via email to