[
https://issues.apache.org/jira/browse/FLINK-9750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537058#comment-16537058
]
ASF GitHub Bot commented on FLINK-9750:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6281#discussion_r201042618
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once
semantics.
+ *
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The
base directory contains
+ * one directory for every bucket. The bucket directories themselves
contain several part files,
+ * with at least one for each parallel subtask of the sink which is
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * <p>The sink uses a {@link Bucketer} to determine in which bucket
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for
example, use time or
+ * a property of the element to determine the bucket directory. The
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour.
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * <p>The filenames of the part files contain the part prefix, "part-",
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"}
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by
that subtask.
+ * When a part file becomes bigger than the user-specified part size or
when the part file becomes older
+ * than the user-specified roll over interval the current part file is
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults
to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link #setRolloverInterval(long)}.
+ *
+ *
+ * <p>In some scenarios, the open buckets are required to change based on
time. In these cases, the sink
+ * needs to determine when a bucket has become inactive, in order to flush
and close the part file.
+ * To support this there are two configurable settings:
+ * <ol>
+ * <li>the frequency to check for inactive buckets, configured by
{@link #setBucketCheckInterval(long)}, and</li>
+ * <li>the minimum amount of time a bucket has to not receive any data
before it is considered inactive,
+ * configured by {@link #setInactivityInterval(long)}.</li>
+ * </ol>
+ * Both of these parameters default to {@code 60, 000 ms}, or {@code 1
min}.
+ *
+ *
+ * <p>Part files can be in one of three states: {@code in-progress},
{@code pending} or {@code finished}.
+ * The reason for this is how the sink works together with the
checkpointing mechanism to provide exactly-once
+ * semantics and fault-tolerance. The part file that is currently being
written to is {@code in-progress}. Once
+ * a part file is closed for writing it becomes {@code pending}. When a
checkpoint is successful the currently
+ * pending files will be moved to {@code finished}.
+ *
+ *
+ * <p>If case of a failure, and in order to guarantee exactly-once
semantics, the sink should roll back to the state it
+ * had when that last successful checkpoint occurred. To this end, when
restoring, the restored files in {@code pending}
+ * state are transferred into the {@code finished} state while any {@code
in-progress} files are rolled back, so that
+ * they do not contain data that arrived after the checkpoint from which
we restore.
+ *
+ * <p><b>NOTE:</b>
+ * <ol>
+ * <li>
+ * If checkpointing is not enabled the pending files will never be
moved to the finished state. In that case,
+ * the pending suffix/prefix can be set to {@code ""} to make the
sink work in a non-fault-tolerant way but
+ * still provide output without prefixes and suffixes.
+ * </li>
+ * <li>
+ * The part files are written using an instance of {@link Writer}.
By default, a
+ * {@link StringWriter} is used, which writes the result of {@code
toString()} for
+ * every element, separated by newlines. You can configure the
writer using the
+ * {@link #setWriter(Writer)}.
+ * </li>
+ * </ol>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class StreamingFileSink<IN>
+ extends RichSinkFunction<IN>
+ implements CheckpointedFunction, CheckpointListener,
ProcessingTimeCallback {
+
+ private static final long serialVersionUID = 2544039385174378235L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingFileSink.class);
+
+ private static final long DEFAULT_CHECK_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_PART_SIZE = 1024L * 1024L * 384L;
+
+ private final Path basePath;
+
+ private transient ResumableWriter fsWriter;
+
+ private transient Clock clock;
+
+ private transient ProcessingTimeService processingTimeService;
+
+ private Bucketer<IN> bucketer;
+
+ private Writer<IN> writer;
+
+ private long bucketCheckInterval = DEFAULT_CHECK_INTERVAL;
+
+ private long partFileSize = DEFAULT_PART_SIZE;
+
+ private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+ private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+ private transient Map<Path, Bucket<IN>> activeBuckets;
+
+ private long initMaxPartCounter = 0L;
+
+ private long maxPartCounterUsed = 0L;
+
+ private final ListStateDescriptor<byte[]> bucketStateDesc =
+ new ListStateDescriptor<>("bucket-states",
+ BytePrimitiveArraySerializer.INSTANCE);
+
+ private transient ListState<byte[]> restoredBucketStates;
+
+ private final ListStateDescriptor<Long> maxPartCounterStateDesc =
+ new ListStateDescriptor<>("max-part-counter",
+ LongSerializer.INSTANCE);
+
+ private transient ListState<Long> restoredMaxCounters;
+
+ private transient SimpleVersionedSerializer<Bucket.BucketState>
bucketStateSerializer;
+
+ private final BucketFactory<IN> bucketFactory;
+
+ /**
+ * Creates a new {@code BucketingSink} that writes files to the given
base directory.
+ *
+ *
+ * <p>This uses a{@link DateTimeBucketer} as {@link Bucketer} and a
{@link StringWriter} has writer.
+ * The maximum bucket size is set to 384 MB.
+ *
+ * @param basePath The directory to which to write the bucket files.
+ */
+ public StreamingFileSink(Path basePath) {
+ this(basePath, new DefaultBucketFactory<>());
+ }
+
+ @VisibleForTesting
+ StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
+ this.basePath = Preconditions.checkNotNull(basePath);
+ this.bucketer = new DateTimeBucketer<>();
+ this.writer = new StringWriter<>();
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+ }
+
+ public StreamingFileSink<IN> setWriter(Writer<IN> writer) {
+ this.writer = Preconditions.checkNotNull(writer);
+ return this;
+ }
+
+ public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
+ this.bucketer = Preconditions.checkNotNull(bucketer);
+ return this;
+ }
+
+ public StreamingFileSink<IN> setPartFileSize(long partFileSize) {
+ this.partFileSize = partFileSize;
+ return this;
+ }
+
+ public StreamingFileSink<IN> setBucketCheckInterval(long
bucketCheckInterval) {
+ this.bucketCheckInterval = bucketCheckInterval;
+ return this;
+ }
+
+ public StreamingFileSink<IN> setRolloverInterval(long rolloverInterval)
{
+ this.rolloverInterval = rolloverInterval;
+ return this;
+ }
+
+ public StreamingFileSink<IN> setInactivityInterval(long
inactivityInterval) {
+ this.inactivityInterval = inactivityInterval;
+ return this;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {
+ final Iterator<Map.Entry<Path, Bucket<IN>>> activeBucketIt =
+ activeBuckets.entrySet().iterator();
+
+ while (activeBucketIt.hasNext()) {
+ Bucket<IN> bucket = activeBucketIt.next().getValue();
+ bucket.commitUpToCheckpoint(checkpointId);
+
+ if (!bucket.isActive()) {
+ // We've dealt with all the pending files and
the writer for this bucket is not currently open.
+ // Therefore this bucket is currently inactive
and we can remove it from our state.
+ activeBucketIt.remove();
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ Preconditions.checkNotNull(restoredBucketStates);
+ Preconditions.checkNotNull(fsWriter);
+ Preconditions.checkNotNull(bucketStateSerializer);
+
+ restoredBucketStates.clear();
+ for (Map.Entry<Path, Bucket<IN>> bucketStateEntry :
activeBuckets.entrySet()) {
+ final Bucket<IN> bucket = bucketStateEntry.getValue();
+ final Bucket.BucketState bucketState = bucket.snapshot(
+ context.getCheckpointId(),
+ context.getCheckpointTimestamp());
+
restoredBucketStates.add(bucketStateSerializer.serialize(bucketState));
+ }
+
+ restoredMaxCounters.clear();
+ restoredMaxCounters.add(maxPartCounterUsed);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ initFileSystemWriter();
+
+ this.activeBuckets = new HashMap<>();
+
+ // Now when restoring, we start fresh. Everything gets
committed and the state is empty.
+ // If in the future we want to resume the in-progress files, we
should make sure that in
+ // case we receive two states for the same bucket, we merge
them appropriately. This includes
+ // keep only one in-progress file and commit the other, and
commit the pending ones, as they
+ // were pending for a previous to the last successful
checkpoint.
+
+ final OperatorStateStore stateStore =
context.getOperatorStateStore();
+
+ restoredBucketStates = stateStore.getListState(bucketStateDesc);
+ restoredMaxCounters =
stateStore.getUnionListState(maxPartCounterStateDesc);
+
+ if (context.isRestored()) {
+ final int subtaskIndex =
getRuntimeContext().getIndexOfThisSubtask();
+
+ LOG.info("Restoring state for the {} (taskIdx={}).",
getClass().getSimpleName(), subtaskIndex);
+
+ for (long partCounter: restoredMaxCounters.get()) {
+ if (partCounter > initMaxPartCounter) {
+ initMaxPartCounter = partCounter;
+ }
+ }
+
+ final int version = bucketStateSerializer.getVersion();
--- End diff --
This needs to use the version of the state serializer that was used to
serialize the data, not of the serializer that we happen to have now.
> Create new StreamingFileSink that works on Flink's FileSystem abstraction
> -------------------------------------------------------------------------
>
> Key: FLINK-9750
> URL: https://issues.apache.org/jira/browse/FLINK-9750
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Using Flink's own file system abstraction means that we can add additional
> streaming/checkpointing related behavior.
> In addition, the new StreamingFileSink should only rely on internal
> checkpointed state what files are possibly in progress or need to roll over,
> never assume enumeration of files in the file system.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)