[
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536705#comment-16536705
]
ASF GitHub Bot commented on FLINK-9603:
---------------------------------------
Github user kent2171 commented on the issue:
https://github.com/apache/flink/pull/6176
nice to hear, thx you @kl0u
> Incorrect indexing of part files, when part suffix is specified
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
> Issue Type: Bug
> Components: filesystem-connector
> Affects Versions: 1.5.0, 1.6.0
> Reporter: Rinat Sharipov
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: pull-request-available
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of
> the part file. It’s very useful, when it’s necessary to set specific
> extension of the file.
>
> During the usage, I’ve found the issue - when new part file is created, it
> has the same part index, as index of just closed file.
> So, when Flink tries to move it into final state, we have a
> FileAlreadyExistsException.
>
> This problem is related with the following code:
> *{color:#e32400}Here we are trying to find the max index of part file, that
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not
> involved into path assembly. This means, that path always doesn’t
> exist{color}*
> *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
> fs.exists(getPendingPathFor(partPath)) ||
> fs.exists(getInProgressPathFor(partPath))) {
> bucketState.partCounter++;
> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here,
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
> partPath = partPath.suffix(partSuffix);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)