KevinyhZou commented on code in PR #7189:
URL: https://github.com/apache/hudi/pull/7189#discussion_r1020904405
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -450,4 +473,159 @@ public static class DummySink implements
SinkFunction<Object> {
private static final long serialVersionUID = 1L;
public static DummySink INSTANCE = new DummySink();
}
+
+ public static DataStreamSink<Object> successFileSink(DataStream<Object>
dataStream, Configuration conf) {
+ return dataStream.addSink(new PartitionSuccessFileWriteSink(conf))
+ .setParallelism(1)
+ .name("success_file_sink");
+ }
+
+ /**
+ * Sink that write a success file to partition path when it write finished.
+ **/
+ public static class PartitionSuccessFileWriteSink extends
RichSinkFunction<Object> implements CheckpointedFunction, CheckpointListener {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionSuccessFileWriteSink.class);
+ // Success file name.
+ private static final String SUCCESS_FILE_NAME = "_SUCCESS";
+ // The name of active partitions state.
+ private static final String ACTIVE_PARTITION_STATE_NAME =
"active-partition-state";
+ // The name of finished partition state.
+ private static final String FINISHED_PARTITION_STATE_NAME =
"finished-partition-state";
+ // The global configuration of flink job.
+ private Configuration conf;
+ // The configured file system handle.
+ private FileSystem fileSystem;
+ // The extractor for partition time.
+ private PartitionTimeExtractorAdapter partitionTimeExtractor;
+ // The extractor for extract partition value from partition path.
+ private PartitionValueExtractor partitionValueExtractor;
+ // The table base path.
+ private String tablePath;
+ // The partition keys.
+ private List<String> partitionKeys;
+ // The partitions on writing currently.
+ private Set<String> activePartitions;
+ // The partitions write finished.
+ private Set<String> finishedPartitions;
+ // The operator state to store active partitions.
+ private ListState<String> activePartitionsState;
+ // The operator state to store finished partitions.
+ private ListState<String> finishedPartitionsState;
+ // The configured time delay to write success file.
+ private Duration partitionSuccessFileDelay;
+ // The checkpoint lock
+ private Object checkpointLock;
+
+ PartitionSuccessFileWriteSink(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ activePartitions = new TreeSet<>();
+ finishedPartitions = new TreeSet<>();
+ tablePath = conf.get(FlinkOptions.PATH);
+ fileSystem = FileSystem.get(URI.create(tablePath));
+ checkpointLock = new Object();
+ String[] partitionFields =
conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+ partitionKeys = Arrays.asList(partitionFields);
+ partitionSuccessFileDelay =
conf.get(FlinkOptions.PARTITION_WRITE_SUCCESS_FILE_DELAY);
+ String partitionValueExtractorClzName =
conf.get(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
+ String partitionTimestampExtractPattern =
conf.get(FlinkOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+ String partitionTimestampFormatPattern =
conf.get(FlinkOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER);
+ try {
+ Class<?> partitionValueExtractorClz =
Class.forName(partitionValueExtractorClzName);
+ partitionValueExtractor = (PartitionValueExtractor)
partitionValueExtractorClz.newInstance();
+ } catch (ClassNotFoundException e) {
+ LOG.error("class not found for: {}", partitionValueExtractorClzName,
e);
+ throw e;
+ }
+ partitionTimeExtractor = new
PartitionTimeExtractorAdapter(partitionTimestampExtractPattern,
partitionTimestampFormatPattern);
+ }
+
+ // Extract the partition time value from partition path, and convert them
to timestamp.
+ private Long convertTimestampByPartitionPath(String partitionPath) {
+ List<String> partitionValues =
partitionValueExtractor.extractPartitionValuesInPath(partitionPath);
+ LocalDateTime localDateTime =
partitionTimeExtractor.extract(partitionKeys, partitionValues);
+ return PartitionTimeExtractorAdapter.toMills(localDateTime);
+ }
+
+ @Override
+ public void invoke(Object value, SinkFunction.Context context) throws
Exception {
+ String partition = (String) value;
+ activePartitions.add(partition);
+ long watermark = context.currentWatermark();
+ Iterator<String> it = activePartitions.iterator();
+ while (it.hasNext()) {
+ String partitionPath = it.next();
+ // Convert the partition path to timestamp if the table is partitioned
by time field, like day, hour
+ Long partitionTimestamp =
convertTimestampByPartitionPath(partitionPath);
+ // If the watermark is greater than the partition timestamp plus the
delay time, it represents the
+ // minimum timestamp in the streaming data is beyond the partition max
timestamp, so add the partition
+ // path to the finished partitions set and remove it from active
partitions set.
+ if (partitionTimestamp + partitionSuccessFileDelay.toMillis() <
watermark) {
+ synchronized (checkpointLock) {
+ finishedPartitions.add(partitionPath);
+ it.remove();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
+ synchronized (checkpointLock) {
+ // Save the partition path of active & finished partition path to
state.
+ Preconditions.checkNotNull(activePartitions);
+ Preconditions.checkNotNull(finishedPartitions);
+ activePartitionsState.update(new ArrayList<>(activePartitions));
+ finishedPartitionsState.update(new ArrayList<>(finishedPartitions));
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ ListStateDescriptor<String> activePartitionStateDesc =
+ new ListStateDescriptor<>(ACTIVE_PARTITION_STATE_NAME,
String.class);
+ activePartitionsState =
context.getOperatorStateStore().getListState(activePartitionStateDesc);
+ ListStateDescriptor<String> finishedPartitionStateDesc =
+ new ListStateDescriptor<>(FINISHED_PARTITION_STATE_NAME,
String.class);
+ finishedPartitionsState =
context.getOperatorStateStore().getListState(finishedPartitionStateDesc);
+ if (context.isRestored()) {
+ activePartitions = new TreeSet<>();
+ finishedPartitions = new TreeSet<>();
+ for (String p : activePartitionsState.get()) {
+ activePartitions.add(p);
+ }
+ for (String p : finishedPartitionsState.get()) {
+ finishedPartitions.add(p);
+ }
+ } else {
+ activePartitions = new TreeSet<>();
+ finishedPartitions = new TreeSet<>();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long l) throws Exception {
+ Iterator<String> it = finishedPartitions.iterator();
+ //Iterate the finished partitions set, and write success file to the
path of partition.
+ synchronized (checkpointLock) {
Review Comment:
OK, the checkpoint lock was removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]