KevinyhZou commented on code in PR #7189:
URL: https://github.com/apache/hudi/pull/7189#discussion_r1020904050


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -450,4 +473,159 @@ public static class DummySink implements 
SinkFunction<Object> {
     private static final long serialVersionUID = 1L;
     public static DummySink INSTANCE = new DummySink();
   }
+
+  public static DataStreamSink<Object> successFileSink(DataStream<Object> 
dataStream, Configuration conf) {
+    return dataStream.addSink(new PartitionSuccessFileWriteSink(conf))
+            .setParallelism(1)
+            .name("success_file_sink");
+  }
+
+  /**
+   * Sink that write a success file to partition path when it write finished.
+   **/
+  public static class PartitionSuccessFileWriteSink extends 
RichSinkFunction<Object> implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionSuccessFileWriteSink.class);
+    // Success file name.
+    private static final String SUCCESS_FILE_NAME = "_SUCCESS";
+    // The name of active partitions state.
+    private static final String ACTIVE_PARTITION_STATE_NAME = 
"active-partition-state";
+    // The name of finished partition state.
+    private static final String FINISHED_PARTITION_STATE_NAME = 
"finished-partition-state";
+    // The global configuration of flink job.
+    private Configuration conf;
+    // The configured file system handle.
+    private FileSystem fileSystem;
+    // The extractor for partition time.
+    private PartitionTimeExtractorAdapter partitionTimeExtractor;
+    // The extractor for extract partition value from partition path.
+    private PartitionValueExtractor partitionValueExtractor;
+    // The table base path.
+    private String tablePath;
+    // The partition keys.
+    private List<String> partitionKeys;
+    // The partitions on writing currently.
+    private Set<String> activePartitions;
+    // The partitions write finished.
+    private Set<String> finishedPartitions;
+    // The operator state to store active partitions.
+    private ListState<String> activePartitionsState;
+    // The operator state to store finished partitions.
+    private ListState<String> finishedPartitionsState;
+    // The configured time delay to write success file.
+    private Duration partitionSuccessFileDelay;
+    // The checkpoint lock
+    private Object checkpointLock;
+
+    PartitionSuccessFileWriteSink(Configuration conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public void open(Configuration config) throws Exception {
+      activePartitions = new TreeSet<>();
+      finishedPartitions = new TreeSet<>();
+      tablePath = conf.get(FlinkOptions.PATH);
+      fileSystem = FileSystem.get(URI.create(tablePath));
+      checkpointLock = new Object();
+      String[] partitionFields = 
conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+      partitionKeys = Arrays.asList(partitionFields);
+      partitionSuccessFileDelay = 
conf.get(FlinkOptions.PARTITION_WRITE_SUCCESS_FILE_DELAY);
+      String partitionValueExtractorClzName = 
conf.get(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
+      String partitionTimestampExtractPattern = 
conf.get(FlinkOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+      String partitionTimestampFormatPattern = 
conf.get(FlinkOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER);
+      try {
+        Class<?> partitionValueExtractorClz = 
Class.forName(partitionValueExtractorClzName);
+        partitionValueExtractor = (PartitionValueExtractor) 
partitionValueExtractorClz.newInstance();
+      } catch (ClassNotFoundException e) {
+        LOG.error("class not found for: {}", partitionValueExtractorClzName, 
e);
+        throw e;
+      }
+      partitionTimeExtractor = new 
PartitionTimeExtractorAdapter(partitionTimestampExtractPattern, 
partitionTimestampFormatPattern);
+    }
+
+    // Extract the partition time value from partition path, and convert them 
to timestamp.
+    private Long convertTimestampByPartitionPath(String partitionPath) {
+      List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partitionPath);
+      LocalDateTime localDateTime = 
partitionTimeExtractor.extract(partitionKeys, partitionValues);
+      return PartitionTimeExtractorAdapter.toMills(localDateTime);
+    }
+
+    @Override
+    public void invoke(Object value, SinkFunction.Context context) throws 
Exception {
+      String partition = (String) value;
+      activePartitions.add(partition);
+      long watermark = context.currentWatermark();
+      Iterator<String> it = activePartitions.iterator();
+      while (it.hasNext()) {
+        String partitionPath = it.next();
+        // Convert the partition path to timestamp if the table is partitioned 
by time field, like day, hour
+        Long partitionTimestamp = 
convertTimestampByPartitionPath(partitionPath);
+        // If the watermark is greater than the partition timestamp plus the 
delay time, it represents the
+        // minimum timestamp in the streaming data is beyond the partition max 
timestamp, so add the partition
+        // path to the finished partitions set and remove it from active 
partitions set.
+        if (partitionTimestamp + partitionSuccessFileDelay.toMillis() < 
watermark) {
+          synchronized (checkpointLock) {
+            finishedPartitions.add(partitionPath);
+            it.remove();
+          }
+        }
+      }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+      synchronized (checkpointLock) {

Review Comment:
   The streaming task and the checkpoint timer may concurrent modify the 
finished partition set , so I add a lock to provide thread safety. But 
according to the mailbox feature of flink version above 1.10,  the checkpoint 
lock is not need now, so I remove this lock. And thanks for your review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to