aljoscha commented on a change in pull request #10859: [FLINK-15278] Update the
StreamingFileSink docs
URL: https://github.com/apache/flink/pull/10859#discussion_r366760554
##########
File path: docs/dev/connectors/streamfile_sink.md
##########
@@ -361,7 +266,174 @@ input.addSink(sink)
The SequenceFileWriterFactory supports additional constructor parameters to
specify compression settings.
-### Important Considerations for S3
+## Bucket Assignment
+
+The bucketing logic defines how the data will be structured into
subdirectories inside the base output directory.
+
+Both row and bulk formats (see [File Formats](#file-formats)) use the
[DateTimeBucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
as the default assigner.
+By default the `DateTimeBucketAssigner` creates hourly buckets based on the
system default timezone
+with the following format: `yyyy-MM-dd--HH`. Both the date format (*i.e.*
bucket size) and timezone can be
+configured manually.
+
+We can specify a custom [BucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
by calling `.withBucketAssigner(assigner)` on the format builders.
+
+Flink comes with two built in BucketAssigners:
+
+ - [DateTimeBucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
: Default time based assigner
+ - [BasePathBucketAssigner]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.html)
: Assigner that stores all part files in the base path (single global bucket)
+
+## Rolling Policy
+
+The [RollingPolicy]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
defines when a given in-progress part file will be closed and moved to the
pending and later to finished state.
+Part files in the "finished" state are the ones that are ready for viewing and
are guaranteed to contain valid data that will not be reverted in case of
failure.
+The Rolling Policy in combination with the checkpointing interval (pending
files become finished on the next checkpoint) control how quickly
+part files become available for downstream readers and also the size and
number of these parts.
+
+Flink comes with two built-in RollingPolicies:
+
+ - [DefaultRollingPolicy]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
+ - [OnCheckpointRollingPolicy]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.html)
+
+## Part file lifecycle
+
+In order to use the output of the `StreamingFileSink` in downstream systems,
we need to understand the naming and lifecycle of the output files produced.
+
+Part files can be in one of three states:
+ 1. **In-progress** : The part file that is currently being written to is
in-progress
+ 2. **Pending** : Closed (due to the specified rolling policy) in-progress
files that are waiting to be committed
+ 3. **Finished** : On successful checkpoints pending files transition to
"Finished"
+
+Only finished files are safe to read by downstream systems as those are
guaranteed to not be modified later.
+
+<div class="alert alert-info">
+ <b>IMPORTANT:</b> Part file indexes are strictly increasing for any given
subtask (in the order they were created). However these indexes are not always
sequential. When the job restarts, the next part index for all subtask will be
the `max part index + 1`
+where `max` is computed across all subtasks.
+</div>
+
+Each writer subtask will have a single in-progress part file at any given time
for every active bucket, but there can be several pending and finished files.
+
+**Part file example**
+
+To better understand the lifecycle of these files let's look at a simple
example with 2 sink subtasks:
+
+```
+└── 2019-08-25--12
+ ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+```
+
+When the part file `part-1-0` is rolled (let's say it becomes too large), it
becomes pending but it is not renamed. The sink then opens a new part file:
`part-1-1`:
+
+```
+└── 2019-08-25--12
+ ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+ └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+As `part-1-0` is now pending completion, after the next successful checkpoint,
it is finalized:
+
+```
+└── 2019-08-25--12
+ ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ ├── part-1-0
+ └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+New buckets are created as dictated by the bucketing policy, and this doesn't
affect currently in-progress files:
+
+```
+└── 2019-08-25--12
+ ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ ├── part-1-0
+ └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+└── 2019-08-25--13
+ └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
+```
+
+Old buckets can still receive new records as the bucketing policy is evaluated
on a per-record basis.
+
+### Part file configuration
+
+Finished files can be distinguished from the in-progress ones by their naming
scheme only.
+
+By default, the file naming strategy is as follows:
+ - **In-progress / Pending**:
`part-<subtaskIndex>-<partFileIndex>.inprogress.uid`
+ - **Finished:** `part-<subtaskIndex>-<partFileIndex>`
+
+Flink allows the user to specify a prefix and/or a suffix for his/her part
files.
+This can be done using an `OutputFileConfig`.
+For example for a prefix "prefix" and a suffix ".ext" the sink will create the
following files:
+
+```
+└── 2019-08-25--12
+ ├── prefix-0-0.ext
+ ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ ├── prefix-1-0.ext
+ └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+The user can specify an `OutputFileConfig` in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+OutputFileConfig config = OutputFileConfig
+ .builder()
+ .withPartPrefix("prefix")
+ .withPartSuffix(".ext")
+ .build();
+
+StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
+ .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withOutputFileConfig(config)
+ .build();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val config = OutputFileConfig
+ .builder()
+ .withPartPrefix("prefix")
+ .withPartSuffix(".ext")
+ .build()
+
+val sink = StreamingFileSink
+ .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withOutputFileConfig(config)
+ .build()
+
+{% endhighlight %}
+</div>
+</div>
+
+## Important Considerations
+
+### General
+
+<span class="label label-danger">Important Note 1</span>: When using Hadoop <
2.7, please use
+the `OnCheckpointRollingPolicy` which rolls part files on every checkpoint.
The reason is that if part files "traverse"
+checkpoint interval, then, upon recovery from a failure the
`StreamingFileSink` may use the `truncate()` method of the
Review comment:
```suggestion
the checkpoint interval, then, upon recovery from a failure the
`StreamingFileSink` may use the `truncate()` method of the
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services