[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9603:
-------------------------------

    Assignee: vinoyang

> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9603
>                 URL: https://issues.apache.org/jira/browse/FLINK-9603
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.5.0
>            Reporter: Rinat Sharipov
>            Assignee: vinoyang
>            Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>  
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>  
> This problem is related with the following code:
> *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
> *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>  
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>       fs.exists(getPendingPathFor(partPath)) ||
>       fs.exists(getInProgressPathFor(partPath))) {
>    bucketState.partCounter++;
>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}***
> {code:java}
> if (partSuffix != null) {
>    partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to