[
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515072#comment-16515072
]
Rinat Sharipov commented on FLINK-9603:
---------------------------------------
Hi mates, I got the following proposal about fix of this issue:
* build path using the same method (or some kind of Builder), instead of using
the same logic multiple times across the code (DRY)
* test, that part index is properly incremented, when part suffix is specified
** in-progress file exists
** pending file exists
** file in final state exists
* and test the same cases when part suffix is not specified
> Incorrect indexing of part files, when part suffix is specified
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
> Issue Type: Bug
> Components: filesystem-connector
> Affects Versions: 1.5.0
> Reporter: Rinat Sharipov
> Assignee: vinoyang
> Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of
> the part file. It’s very useful, when it’s necessary to set specific
> extension of the file.
>
> During the usage, I’ve found the issue - when new part file is created, it
> has the same part index, as index of just closed file.
> So, when Flink tries to move it into final state, we have a
> FileAlreadyExistsException.
>
> This problem is related with the following code:
> *{color:#e32400}Here we are trying to find the max index of part file, that
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not
> involved into path assembly. This means, that path always doesn’t
> exist{color}*
> *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
> fs.exists(getPendingPathFor(partPath)) ||
> fs.exists(getInProgressPathFor(partPath))) {
> bucketState.partCounter++;
> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here,
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
> partPath = partPath.suffix(partSuffix);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)