lirui-apache commented on a change in pull request #12062:
URL: https://github.com/apache/flink/pull/12062#discussion_r422836028
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
##########
@@ -467,7 +483,11 @@ public void onProcessingTime(long timestamp) throws
Exception {
@Override
public void invoke(IN value, SinkFunction.Context context) throws
Exception {
- buckets.onElement(value, context);
+ buckets.onElement(
Review comment:
Why do we need this change?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+ key("partition.time-extractor.type")
+ .stringType()
+ .defaultValue("default")
+ .withDescription("Time extractor to
extract time from partition values. Only be" +
+ " used if order is set
to partition-time. Support default and custom." +
+ " For default, can
configure timestamp pattern." +
+ " For custom, should
configure extractor class.");
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS
=
+ key("partition.time-extractor.class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The extractor class
for implement PartitionTimeExtractor interface.");
+
+ public static final ConfigOption<String>
PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN =
+ key("partition.time-extractor.timestamp-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The 'default'
construction way allows users to use partition" +
+ " fields to get a legal
timestamp pattern." +
+ " Default support
'yyyy-mm-dd hh:mm:ss' from first field." +
+ " If timestamp in
partition is single field 'dt', can configure: '$dt'." +
+ " If timestamp in
partition is year, month, day, hour," +
+ " can configure:
'$year-$month-$day $hour:00:00'." +
+ " If timestamp in
partition is dt and hour, can configure: '$dt $hour:00:00'.");
+
+ public static final ConfigOption<Duration> PARTITION_TIME_INTERVAL =
+ key("partition.time-interval")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Interval time of
partition," +
+ " if it is a day
partition, should be '1 d'," +
+ " if it is a hour
partition, should be '1 h'");
+
+ public static final ConfigOption<String> PARTITION_COMMIT_POLICY_TYPE =
+ key("partition.commit-policy.type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("");
Review comment:
add descriptions
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitManager.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.util.StringUtils;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_INTERVAL;
+import static
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Manage partition and watermark information.
+ */
+public class PartitionCommitManager {
+
+ private static final ListStateDescriptor<Map<Long, Long>>
WATERMARKS_STATE_DESC =
+ new ListStateDescriptor<>(
+ "checkpoint-id-to-watermark",
+ new
MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
+ private static final ListStateDescriptor<List<String>>
PENDING_PARTITIONS_STATE_DESC =
+ new ListStateDescriptor<>(
+ "pending-partitions",
+ new
ListSerializer<>(StringSerializer.INSTANCE));
+
+ private final ListState<Map<Long, Long>> watermarksState;
+ private final ListState<List<String>> pendingPartitionsState;
+ private final TreeMap<Long, Long> watermarks;
+ private final Set<String> pendingPartitions;
+ private final PartitionTimeExtractor extractor;
+ private final long timeIntervalMills;
+ private final List<String> partitionKeys;
+
+ public PartitionCommitManager(
+ boolean isRestored,
+ OperatorStateStore operatorStateStore,
+ ClassLoader userCodeClassLoader,
+ List<String> partitionKeys,
+ Configuration conf) throws Exception {
+ this.partitionKeys = partitionKeys;
+ String extractorKind = conf.get(PARTITION_TIME_EXTRACTOR_KIND);
+ String extractorClass =
conf.get(PARTITION_TIME_EXTRACTOR_CLASS);
+ String extractorPattern =
conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+ this.timeIntervalMills =
conf.getOptional(PARTITION_TIME_INTERVAL)
+ .map(Duration::toMillis)
+ .orElse(Long.MAX_VALUE);
+ this.extractor = PartitionTimeExtractor.create(
+ userCodeClassLoader,
+ extractorKind,
+ extractorClass,
+ extractorPattern);
+
+ this.watermarksState =
operatorStateStore.getListState(WATERMARKS_STATE_DESC);
+ this.pendingPartitionsState =
operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
+
+ this.watermarks = new TreeMap<>();
+ this.pendingPartitions = new HashSet<>();
+ if (isRestored) {
+
watermarks.putAll(watermarksState.get().iterator().next());
+
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
+ }
+ }
+
+ /**
+ * Add a pending partition.
+ */
+ public void addPartition(String partition) {
+ if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
+ this.pendingPartitions.add(partition);
+ }
+ }
+
+ /**
+ * Trigger commit of pending partitions, and cleanup useless watermarks
and partitions.
+ */
+ public List<String> triggerCommit(long checkpointId) {
Review comment:
I don't think this method really "triggers" the commit. Seems it just
decides which partitions should be committed. So maybe rename to
`getPartitionsToCommit`?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single
(non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+ implements
OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Configuration conf;
+
+ private final List<String> partitionKeys;
+
+ private final TableMetaStoreFactory metaStoreFactory;
+
+ private transient PartitionCommitManager commitManager;
+
+ private transient TaskTracker taskTracker;
+
+ private transient long currentWatermark = Long.MIN_VALUE;
+
+ private transient List<PartitionCommitPolicy> policies;
+
+ public StreamingFileCommitter(
+ List<String> partitionKeys, TableMetaStoreFactory
metaStoreFactory, Configuration conf) {
+ this.partitionKeys = partitionKeys;
+ this.metaStoreFactory = metaStoreFactory;
+ this.conf = conf;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ this.commitManager = new PartitionCommitManager(
+ context.isRestored(),
+ context.getOperatorStateStore(),
+ getUserCodeClassloader(),
+ partitionKeys,
+ conf);
+ this.policies = PartitionCommitPolicy.createCommitChain(
+ conf.get(PARTITION_COMMIT_POLICY_TYPE),
+
conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+ }
+
+ @Override
+ public void processElement(StreamRecord<CommitMessage> element) throws
Exception {
+ CommitMessage message = element.getValue();
+ for (String partition : message.partitions) {
+ commitManager.addPartition(partition);
+ }
+
+ if (taskTracker == null) {
+ taskTracker = new TaskTracker(message.numberOfTasks);
+ }
+ boolean needCommit = taskTracker.add(message.checkpointId,
message.taskId);
+ if (needCommit) {
+
commitPartitions(commitManager.triggerCommit(message.checkpointId));
+ }
+ }
+
+ private void commitPartitions(List<String> partitions) throws Exception
{
+ try (TableMetaStoreFactory.TableMetaStore metaStore =
metaStoreFactory.createTableMetaStore()) {
+ FileSystem fs =
metaStore.getLocationPath().getFileSystem();
+ for (String partition : partitions) {
+ LinkedHashMap<String, String> partSpec =
extractPartitionSpecFromPath(new Path(partition));
+ Path path = new
Path(metaStore.getLocationPath(), generatePartitionPath(partSpec));
+ for (PartitionCommitPolicy policy : policies) {
+ policy.commit(partSpec, path, fs,
metaStore);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ commitManager.snapshotState(context.getCheckpointId(),
currentWatermark);
+ }
+
+ /**
+ * The message sent upstream.
+ */
+ public static class CommitMessage implements Serializable {
+
+ public long checkpointId;
+ public int taskId;
+ public int numberOfTasks;
+ public List<String> partitions;
Review comment:
Can we have some comments about which partitions should be in this list?
My understanding is it should include partitions for which some files should be
committed, right?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionCommitPolicy.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Policy for partition commit.
+ */
+public interface PartitionCommitPolicy extends Serializable {
+
+ /**
+ * Commit partition by partitionSpec and path.
+ */
+ void commit(
+ LinkedHashMap<String, String> partitionSpec,
+ Path partitionPath,
+ FileSystem fileSystem,
+ TableMetaStoreFactory.TableMetaStore metaStore) throws
Exception;
+
+ static List<PartitionCommitPolicy> createCommitChain(String policy,
String successFileName) {
+ if (policy == null) {
+ return Collections.emptyList();
+ }
+ String[] policyStrings = policy.split(",");
+ return Arrays.stream(policyStrings).map(name -> {
+ switch (name) {
+ case "metastore":
Review comment:
add constants for them
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Operator for file system sink.
+ */
+public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
+ implements OneInputStreamOperator<RowData, CommitMessage>,
ProcessingTimeCallback {
+
+ private static final long serialVersionUID = 1L;
+
+ // -------------------------- state descriptors
---------------------------
+
+ private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
+ new ListStateDescriptor<>("bucket-states",
BytePrimitiveArraySerializer.INSTANCE);
+
+ private static final ListStateDescriptor<Long>
MAX_PART_COUNTER_STATE_DESC =
+ new ListStateDescriptor<>("max-part-counter",
LongSerializer.INSTANCE);
+
+ // ------------------------ configuration fields
--------------------------
+
+ private final long bucketCheckInterval;
+
+ private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+ StreamingFileSink.BucketsBuilder<RowData, ?, ?>>
bucketsBuilder;
+
+ private final FileSystemBucketListener listener;
+
+ // --------------------------- runtime fields
-----------------------------
+
+ private transient Buckets<RowData, ?> buckets;
+
+ private transient ProcessingTimeService processingTimeService;
+
+ private transient long currentWatermark = Long.MIN_VALUE;
+
+ private transient Set<String> inactivePartitions;
+
+ // --------------------------- State Related Fields
-----------------------------
+
+ private transient ListState<byte[]> bucketStates;
+
+ private transient ListState<Long> maxPartCountersState;
+
+ public StreamingFileWriter(
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
+
StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
+ FileSystemBucketListener listener) {
+ this.bucketCheckInterval = bucketCheckInterval;
+ this.bucketsBuilder = bucketsBuilder;
+ this.listener = listener;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ final int subtaskIndex =
getRuntimeContext().getIndexOfThisSubtask();
+ this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+ final OperatorStateStore stateStore =
context.getOperatorStateStore();
+ bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+ maxPartCountersState =
stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+ if (context.isRestored()) {
+ buckets.initializeState(bucketStates,
maxPartCountersState);
+ }
+ inactivePartitions = new HashSet<>();
+ listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ Preconditions.checkState(bucketStates != null &&
maxPartCountersState != null, "sink has not been initialized");
+ buckets.snapshotState(
+ context.getCheckpointId(),
+ bucketStates,
+ maxPartCountersState);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.processingTimeService =
getRuntimeContext().getProcessingTimeService();
+ long currentProcessingTime =
processingTimeService.getCurrentProcessingTime();
+ processingTimeService.registerTimer(currentProcessingTime +
bucketCheckInterval, this);
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ final long currentTime =
processingTimeService.getCurrentProcessingTime();
+ buckets.onProcessingTime(currentTime);
+ processingTimeService.registerTimer(currentTime +
bucketCheckInterval, this);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws
Exception {
+ buckets.onElement(
+ element.getValue(),
+
getProcessingTimeService().getCurrentProcessingTime(),
+ element.getTimestamp(),
+ currentWatermark);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ buckets.commitUpToCheckpoint(checkpointId);
+ CommitMessage message = new CommitMessage(
+ checkpointId,
+ getRuntimeContext().getIndexOfThisSubtask(),
+
getRuntimeContext().getNumberOfParallelSubtasks(),
+ new ArrayList<>(inactivePartitions));
+ output.collect(new StreamRecord<>(message));
+ inactivePartitions.clear();
+ }
+
+ @Override
+ public void close() throws Exception {
Review comment:
Is it possible that there's some pending data between `close` and the
last `notifyCheckpointComplete`?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_COMMIT_POLICY_TYPE;
+import static
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
+import static
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
+
+/**
+ * Committer for {@link StreamingFileWriter}. This is the single
(non-parallel) task.
+ */
+public class StreamingFileCommitter extends AbstractStreamOperator<Void>
+ implements
OneInputStreamOperator<StreamingFileCommitter.CommitMessage, Void> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Configuration conf;
+
+ private final List<String> partitionKeys;
+
+ private final TableMetaStoreFactory metaStoreFactory;
+
+ private transient PartitionCommitManager commitManager;
+
+ private transient TaskTracker taskTracker;
+
+ private transient long currentWatermark = Long.MIN_VALUE;
+
+ private transient List<PartitionCommitPolicy> policies;
+
+ public StreamingFileCommitter(
+ List<String> partitionKeys, TableMetaStoreFactory
metaStoreFactory, Configuration conf) {
+ this.partitionKeys = partitionKeys;
+ this.metaStoreFactory = metaStoreFactory;
+ this.conf = conf;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ this.commitManager = new PartitionCommitManager(
+ context.isRestored(),
+ context.getOperatorStateStore(),
+ getUserCodeClassloader(),
+ partitionKeys,
+ conf);
+ this.policies = PartitionCommitPolicy.createCommitChain(
+ conf.get(PARTITION_COMMIT_POLICY_TYPE),
+
conf.get(PARTITION_COMMIT_POLICY_SUCCESS_FILE_NAME));
+ }
+
+ @Override
+ public void processElement(StreamRecord<CommitMessage> element) throws
Exception {
+ CommitMessage message = element.getValue();
+ for (String partition : message.partitions) {
+ commitManager.addPartition(partition);
+ }
+
+ if (taskTracker == null) {
+ taskTracker = new TaskTracker(message.numberOfTasks);
+ }
+ boolean needCommit = taskTracker.add(message.checkpointId,
message.taskId);
+ if (needCommit) {
+
commitPartitions(commitManager.triggerCommit(message.checkpointId));
+ }
+ }
+
+ private void commitPartitions(List<String> partitions) throws Exception
{
+ try (TableMetaStoreFactory.TableMetaStore metaStore =
metaStoreFactory.createTableMetaStore()) {
Review comment:
Does this invoke traffic to HMS? If so maybe we should only do it when
`partitions` is not empty.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/SuccessFileCommitPolicy.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Partition commit policy to update metastore.
Review comment:
incorrect java doc
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]