lirui-apache commented on a change in pull request #12062:
URL: https://github.com/apache/flink/pull/12062#discussion_r422836028



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -467,7 +483,11 @@ public void onProcessingTime(long timestamp) throws 
Exception {
 
        @Override
        public void invoke(IN value, SinkFunction.Context context) throws 
Exception {
-               buckets.onElement(value, context);
+               buckets.onElement(

Review comment:
       Why do we need this change?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+                       key("partition.time-extractor.type")
+                                       .stringType()
+                                       .defaultValue("default")
+                                       .withDescription("Time extractor to 
extract time from partition values. Only be" +
+                                                       " used if order is set 
to partition-time. Support default and custom." +
+                                                       " For default, can 
configure timestamp pattern." +
+                                                       " For custom, should 
configure extractor class.");
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS 
=
+                       key("partition.time-extractor.class")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("The extractor class 
for implement PartitionTimeExtractor interface.");
+
+       public static final ConfigOption<String> 
PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN =
+                       key("partition.time-extractor.timestamp-pattern")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("The 'default' 
construction way allows users to use partition" +
+                                                       " fields to get a legal 
timestamp pattern." +
+                                                       " Default support 
'yyyy-mm-dd hh:mm:ss' from first field." +
+                                                       " If timestamp in 
partition is single field 'dt', can configure: '$dt'." +
+                                                       " If timestamp in 
partition is year, month, day, hour," +
+                                                       " can configure: 
'$year-$month-$day $hour:00:00'." +
+                                                       " If timestamp in 
partition is dt and hour, can configure: '$dt $hour:00:00'.");
+
+       public static final ConfigOption<Duration> PARTITION_TIME_INTERVAL =
+                       key("partition.time-interval")
+                                       .durationType()
+                                       .noDefaultValue()
+                                       .withDescription("Interval time of 
partition," +
+                                                       " if it is a day 
partition, should be '1 d'," +
+                                                       " if it is a hour 
partition, should be '1 h'");
+
+       public static final ConfigOption<String> PARTITION_COMMIT_POLICY_TYPE =
+                       key("partition.commit-policy.type")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("");

Review comment:
       add descriptions

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitManager.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.util.StringUtils;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_INTERVAL;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Manage partition and watermark information.
+ */
+public class PartitionCommitManager {
+
+       private static final ListStateDescriptor<Map<Long, Long>> 
WATERMARKS_STATE_DESC =
+                       new ListStateDescriptor<>(
+                                       "checkpoint-id-to-watermark",
+                                       new 
MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
+       private static final ListStateDescriptor<List<String>> 
PENDING_PARTITIONS_STATE_DESC =
+                       new ListStateDescriptor<>(
+                                       "pending-partitions",
+                                       new 
ListSerializer<>(StringSerializer.INSTANCE));
+
+       private final ListState<Map<Long, Long>> watermarksState;
+       private final ListState<List<String>> pendingPartitionsState;
+       private final TreeMap<Long, Long> watermarks;
+       private final Set<String> pendingPartitions;
+       private final PartitionTimeExtractor extractor;
+       private final long timeIntervalMills;
+       private final List<String> partitionKeys;
+
+       public PartitionCommitManager(
+                       boolean isRestored,
+                       OperatorStateStore operatorStateStore,
+                       ClassLoader userCodeClassLoader,
+                       List<String> partitionKeys,
+                       Configuration conf) throws Exception {
+               this.partitionKeys = partitionKeys;
+               String extractorKind = conf.get(PARTITION_TIME_EXTRACTOR_KIND);
+               String extractorClass = 
conf.get(PARTITION_TIME_EXTRACTOR_CLASS);
+               String extractorPattern = 
conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+               this.timeIntervalMills = 
conf.getOptional(PARTITION_TIME_INTERVAL)
+                               .map(Duration::toMillis)
+                               .orElse(Long.MAX_VALUE);
+               this.extractor = PartitionTimeExtractor.create(
+                               userCodeClassLoader,
+                               extractorKind,
+                               extractorClass,
+                               extractorPattern);
+
+               this.watermarksState = 
operatorStateStore.getListState(WATERMARKS_STATE_DESC);
+               this.pendingPartitionsState = 
operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
+
+               this.watermarks = new TreeMap<>();
+               this.pendingPartitions = new HashSet<>();
+               if (isRestored) {
+                       
watermarks.putAll(watermarksState.get().iterator().next());
+                       
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
+               }
+       }
+
+       /**
+        * Add a pending partition.
+        */
+       public void addPartition(String partition) {
+               if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
+                       this.pendingPartitions.add(partition);
+               }
+       }
+
+       /**
+        * Trigger commit of pending partitions, and cleanup useless watermarks 
and partitions.
+        */
+       public List<String> triggerCommit(long checkpointId) {

Review comment:
       I don't think this method really "triggers" the commit. Seems it just 
decides which partitions should be committed. So maybe rename to 
`getPartitionsToCommit`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single 
(non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+               implements 
OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Configuration conf;
+
+       private final List<String> partitionKeys;
+
+       private final TableMetaStoreFactory metaStoreFactory;
+
+       private transient PartitionCommitManager commitManager;
+
+       private transient TaskTracker taskTracker;
+
+       private transient long currentWatermark = Long.MIN_VALUE;
+
+       private transient List<PartitionCommitPolicy> policies;
+
+       public StreamingFileCommitter(
+                       List<String> partitionKeys, TableMetaStoreFactory 
metaStoreFactory, Configuration conf) {
+               this.partitionKeys = partitionKeys;
+               this.metaStoreFactory = metaStoreFactory;
+               this.conf = conf;
+       }
+
+       @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+               this.commitManager = new PartitionCommitManager(
+                               context.isRestored(),
+                               context.getOperatorStateStore(),
+                               getUserCodeClassloader(),
+                               partitionKeys,
+                               conf);
+               this.policies = PartitionCommitPolicy.createCommitChain(
+                               conf.get(PARTITION_COMMIT_POLICY_TYPE),
+                               
conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+       }
+
+       @Override
+       public void processElement(StreamRecord<CommitMessage> element) throws 
Exception {
+               CommitMessage message = element.getValue();
+               for (String partition : message.partitions) {
+                       commitManager.addPartition(partition);
+               }
+
+               if (taskTracker == null) {
+                       taskTracker = new TaskTracker(message.numberOfTasks);
+               }
+               boolean needCommit = taskTracker.add(message.checkpointId, 
message.taskId);
+               if (needCommit) {
+                       
commitPartitions(commitManager.triggerCommit(message.checkpointId));
+               }
+       }
+
+       private void commitPartitions(List<String> partitions) throws Exception 
{
+               try (TableMetaStoreFactory.TableMetaStore metaStore = 
metaStoreFactory.createTableMetaStore()) {
+                       FileSystem fs = 
metaStore.getLocationPath().getFileSystem();
+                       for (String partition : partitions) {
+                               LinkedHashMap<String, String> partSpec = 
extractPartitionSpecFromPath(new Path(partition));
+                               Path path = new 
Path(metaStore.getLocationPath(), generatePartitionPath(partSpec));
+                               for (PartitionCommitPolicy policy : policies) {
+                                       policy.commit(partSpec, path, fs, 
metaStore);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+               this.currentWatermark = mark.getTimestamp();
+       }
+
+       @Override
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+               commitManager.snapshotState(context.getCheckpointId(), 
currentWatermark);
+       }
+
+       /**
+        * The message sent upstream.
+        */
+       public static class CommitMessage implements Serializable {
+
+               public long checkpointId;
+               public int taskId;
+               public int numberOfTasks;
+               public List<String> partitions;

Review comment:
       Can we have some comments about which partitions should be in this list? 
My understanding is it should include partitions for which some files should be 
committed, right?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitPolicy.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Policy for partition commit.
+ */
+public interface PartitionCommitPolicy extends Serializable {
+
+       /**
+        * Commit partition by partitionSpec and path.
+        */
+       void commit(
+                       LinkedHashMap<String, String> partitionSpec,
+                       Path partitionPath,
+                       FileSystem fileSystem,
+                       TableMetaStoreFactory.TableMetaStore metaStore) throws 
Exception;
+
+       static List<PartitionCommitPolicy> createCommitChain(String policy, 
String successFileName) {
+               if (policy == null) {
+                       return Collections.emptyList();
+               }
+               String[] policyStrings = policy.split(",");
+               return Arrays.stream(policyStrings).map(name -> {
+                       switch (name) {
+                               case "metastore":

Review comment:
       add constants for them

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Operator for file system sink.
+ */
+public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
+               implements OneInputStreamOperator<RowData, CommitMessage>, 
ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       // -------------------------- state descriptors 
---------------------------
+
+       private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
+                       new ListStateDescriptor<>("bucket-states", 
BytePrimitiveArraySerializer.INSTANCE);
+
+       private static final ListStateDescriptor<Long> 
MAX_PART_COUNTER_STATE_DESC =
+                       new ListStateDescriptor<>("max-part-counter", 
LongSerializer.INSTANCE);
+
+       // ------------------------ configuration fields 
--------------------------
+
+       private final long bucketCheckInterval;
+
+       private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+                       StreamingFileSink.BucketsBuilder<RowData, ?, ?>> 
bucketsBuilder;
+
+       private final FileSystemBucketListener listener;
+
+       // --------------------------- runtime fields 
-----------------------------
+
+       private transient Buckets<RowData, ?> buckets;
+
+       private transient ProcessingTimeService processingTimeService;
+
+       private transient long currentWatermark = Long.MIN_VALUE;
+
+       private transient Set<String> inactivePartitions;
+
+       // --------------------------- State Related Fields 
-----------------------------
+
+       private transient ListState<byte[]> bucketStates;
+
+       private transient ListState<Long> maxPartCountersState;
+
+       public StreamingFileWriter(
+                       long bucketCheckInterval,
+                       StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+                                       
StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
+                       FileSystemBucketListener listener) {
+               this.bucketCheckInterval = bucketCheckInterval;
+               this.bucketsBuilder = bucketsBuilder;
+               this.listener = listener;
+       }
+
+       @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+               final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+               this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+               final OperatorStateStore stateStore = 
context.getOperatorStateStore();
+               bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+               maxPartCountersState = 
stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+               if (context.isRestored()) {
+                       buckets.initializeState(bucketStates, 
maxPartCountersState);
+               }
+               inactivePartitions = new HashSet<>();
+               listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+       }
+
+       @Override
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+               Preconditions.checkState(bucketStates != null && 
maxPartCountersState != null, "sink has not been initialized");
+               buckets.snapshotState(
+                               context.getCheckpointId(),
+                               bucketStates,
+                               maxPartCountersState);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               this.processingTimeService = 
getRuntimeContext().getProcessingTimeService();
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
+               processingTimeService.registerTimer(currentProcessingTime + 
bucketCheckInterval, this);
+       }
+
+       @Override
+       public void onProcessingTime(long timestamp) throws Exception {
+               final long currentTime = 
processingTimeService.getCurrentProcessingTime();
+               buckets.onProcessingTime(currentTime);
+               processingTimeService.registerTimer(currentTime + 
bucketCheckInterval, this);
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+               this.currentWatermark = mark.getTimestamp();
+       }
+
+       @Override
+       public void processElement(StreamRecord<RowData> element) throws 
Exception {
+               buckets.onElement(
+                               element.getValue(),
+                               
getProcessingTimeService().getCurrentProcessingTime(),
+                               element.getTimestamp(),
+                               currentWatermark);
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               super.notifyCheckpointComplete(checkpointId);
+               buckets.commitUpToCheckpoint(checkpointId);
+               CommitMessage message = new CommitMessage(
+                               checkpointId,
+                               getRuntimeContext().getIndexOfThisSubtask(),
+                               
getRuntimeContext().getNumberOfParallelSubtasks(),
+                               new ArrayList<>(inactivePartitions));
+               output.collect(new StreamRecord<>(message));
+               inactivePartitions.clear();
+       }
+
+       @Override
+       public void close() throws Exception {

Review comment:
       Is it possible that there's some pending data between `close` and the 
last `notifyCheckpointComplete`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single 
(non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+               implements 
OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Configuration conf;
+
+       private final List<String> partitionKeys;
+
+       private final TableMetaStoreFactory metaStoreFactory;
+
+       private transient PartitionCommitManager commitManager;
+
+       private transient TaskTracker taskTracker;
+
+       private transient long currentWatermark = Long.MIN_VALUE;
+
+       private transient List<PartitionCommitPolicy> policies;
+
+       public StreamingFileCommitter(
+                       List<String> partitionKeys, TableMetaStoreFactory 
metaStoreFactory, Configuration conf) {
+               this.partitionKeys = partitionKeys;
+               this.metaStoreFactory = metaStoreFactory;
+               this.conf = conf;
+       }
+
+       @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+               this.commitManager = new PartitionCommitManager(
+                               context.isRestored(),
+                               context.getOperatorStateStore(),
+                               getUserCodeClassloader(),
+                               partitionKeys,
+                               conf);
+               this.policies = PartitionCommitPolicy.createCommitChain(
+                               conf.get(PARTITION_COMMIT_POLICY_TYPE),
+                               
conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+       }
+
+       @Override
+       public void processElement(StreamRecord<CommitMessage> element) throws 
Exception {
+               CommitMessage message = element.getValue();
+               for (String partition : message.partitions) {
+                       commitManager.addPartition(partition);
+               }
+
+               if (taskTracker == null) {
+                       taskTracker = new TaskTracker(message.numberOfTasks);
+               }
+               boolean needCommit = taskTracker.add(message.checkpointId, 
message.taskId);
+               if (needCommit) {
+                       
commitPartitions(commitManager.triggerCommit(message.checkpointId));
+               }
+       }
+
+       private void commitPartitions(List<String> partitions) throws Exception 
{
+               try (TableMetaStoreFactory.TableMetaStore metaStore = 
metaStoreFactory.createTableMetaStore()) {

Review comment:
       Does this invoke traffic to HMS? If so maybe we should only do it when 
`partitions` is not empty.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/SuccessFileCommitPolicy.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Partition commit policy to update metastore.

Review comment:
       incorrect java doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to