[
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668572#comment-15668572
]
ASF GitHub Bot commented on FLINK-5056:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2797#discussion_r88125887
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
---
@@ -70,7 +71,37 @@
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
- private OneInputStreamOperatorTestHarness<String, Object>
createTestSink(File dataDir) throws Exception {
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ private OneInputStreamOperatorTestHarness<String, Object>
createRescalingTestSink(
+ File outDir, int totalParallelism, int taskIdx, long
inactivityInterval) throws Exception {
+
+ BucketingSink<String> sink = new
BucketingSink<String>(outDir.getAbsolutePath())
+ .setBucketer(new Bucketer<String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Path getBucketPath(Clock clock, Path
basePath, String element) {
+ return new Path(basePath, element);
+ }
+ })
+ .setWriter(new StringWriter<String>())
+ .setInactiveBucketCheckInterval(inactivityInterval)
+ .setInactiveBucketThreshold(inactivityInterval)
+ .setPartPrefix("part")
--- End diff --
this should also be moved into a field since we use it in checkFS
> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
> Issue Type: Bug
> Components: filesystem-connector
> Affects Versions: 1.1.3
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a
> notification about a previous checkpoint arrives, it clears its state. This
> can
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the
> problem:
> -> input data
> -> snapshot(0)
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the
> data
> that arrived for checkpoint 1.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)