luoyuxia commented on a change in pull request #16027:
URL: https://github.com/apache/flink/pull/16027#discussion_r643631928
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -26,29 +27,52 @@
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+
/** Writer for emitting {@link PartitionCommitInfo} to downstream. */
public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN,
PartitionCommitInfo> {
private static final long serialVersionUID = 2L;
+ private final List<String> partitionKeys;
+ private final Configuration conf;
+
private transient Set<String> currentNewPartitions;
private transient TreeMap<Long, Set<String>> newPartitions;
private transient Set<String> committablePartitions;
+ private transient PartitionCommitTrigger trigger;
+
public StreamingFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
IN, String, ? extends
StreamingFileSink.BucketsBuilder<IN, String, ?>>
- bucketsBuilder) {
+ bucketsBuilder,
+ List<String> partitionKeys,
+ Configuration conf) {
super(bucketCheckInterval, bucketsBuilder);
+ this.partitionKeys = partitionKeys;
+ this.conf = conf;
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
+ if (isPartitionCommitTriggerEnabled()) {
+ trigger =
+ PartitionCommitTrigger.create(
+ context.isRestored(),
+ context.getOperatorStateStore(),
Review comment:
@JingsongLi
I think we stiil need real OperatorStateStore in StreamingFileWriter.
For ProcessTimeeCommitTrigger, it will save the partition and the parition's
create time to state. When fail over, it'll restore parition and parition's
create time. If we don't save parition's create time, then when restore, we
have no way to know what's the parition create time, thus can't check the
parition is commitable or not according commit policy.
The logic is same for PartitionTimeeCommitTrigger, it need to save
watermarks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]