[
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391760#comment-15391760
]
ASF GitHub Bot commented on FLINK-4190:
---------------------------------------
Github user joshfg commented on the issue:
https://github.com/apache/flink/pull/2269
Ah I see, that makes sense.
I've began refactoring the tests here:
https://github.com/joshfg/flink/blob/flink-4190/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
But have run into this strange exception in the
`testNonRollingSequenceFileWithoutCompressionWriter` test:
```
java.lang.IllegalStateException: Key Class has not been initialized.
at
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.open(SequenceFileWriter.java:84)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:500)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:396)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:226)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testNonRollingSequenceFileWithoutCompressionWriter(BucketingSinkTest.java:220)
```
Any ideas what would cause that? I've copied the HDFS cluster
initialisation exactly as it was in the original tests...
> Generalise RollingSink to work with arbitrary buckets
> -----------------------------------------------------
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
> Issue Type: Improvement
> Components: filesystem-connector, Streaming Connectors
> Reporter: Josh Forman-Gornall
> Assignee: Josh Forman-Gornall
> Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to
> directories that are bucketed by system time (e.g. minutely) and to only be
> writing to one file within one bucket at any point in time. When the system
> time determines that the current bucket should be changed, the current bucket
> and file are closed and a new bucket and file are created. The sink cannot be
> used for the more general problem of writing to arbitrary buckets, perhaps
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes
> the file and moves on to the new bucket. Therefore the sink cannot have more
> than one bucket/file open at a time. Additionally the checkpointing mechanics
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when
> the bucket path changes. We need another way to determine when a bucket has
> become inactive and needs to be closed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)