luoyuxia commented on a change in pull request #16027:
URL: https://github.com/apache/flink/pull/16027#discussion_r643631928
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -26,29 +27,52 @@
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+
/** Writer for emitting {@link PartitionCommitInfo} to downstream. */
public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN,
PartitionCommitInfo> {
private static final long serialVersionUID = 2L;
+ private final List<String> partitionKeys;
+ private final Configuration conf;
+
private transient Set<String> currentNewPartitions;
private transient TreeMap<Long, Set<String>> newPartitions;
private transient Set<String> committablePartitions;
+ private transient PartitionCommitTrigger trigger;
+
public StreamingFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
IN, String, ? extends
StreamingFileSink.BucketsBuilder<IN, String, ?>>
- bucketsBuilder) {
+ bucketsBuilder,
+ List<String> partitionKeys,
+ Configuration conf) {
super(bucketCheckInterval, bucketsBuilder);
+ this.partitionKeys = partitionKeys;
+ this.conf = conf;
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
+ if (isPartitionCommitTriggerEnabled()) {
+ trigger =
+ PartitionCommitTrigger.create(
+ context.isRestored(),
+ context.getOperatorStateStore(),
Review comment:
@JingsongLi
Yeah, you're right. We don't need a real OperatorStateStore for this
committer.
Maybe we can pass "restored=false" to PartitionCommitTrigger.create to tell
PartitionCommitTrigger not to restore from last checkpoint's state.
Or add extra parameter such 'isStateful' for PartitionCommitTrigger to tell
PartitionCommitTrigger whether should save state or not.
Or never to do snapshot for this committer in StreamingFileWriter, but we
need add an extra method *addWaterMark* to PartitionCommitTrigger for it need
to maintain watermark in PartitionTimeCommitTrigger. *addWaterMark* is done in
snapshot, but now we won't do snapshot for this committer, so we need other way
to add WaterMark.
Or same other better ways.
Any suggestion for it ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]