[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9603:
----------------------------------
    Description: 
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
 
{code:java}
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
while (fs.exists(partPath) ||
      fs.exists(getPendingPathFor(partPath)) ||
      fs.exists(getInProgressPathFor(partPath))) {
   bucketState.partCounter++;
   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
{code}
*{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}***
{code:java}
if (partSuffix != null) {
   partPath = partPath.suffix(partSuffix);
}
{code}

  was:
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*Path 
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);while (fs.exists(partPath) ||
 fs.exists(getPendingPathFor(partPath)) ||
 fs.exists(getInProgressPathFor(partPath))) {
 bucketState.partCounter++;
 partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

 *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}* if (partSuffix 
!= null) {
 partPath = partPath.suffix(partSuffix);
}


> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9603
>                 URL: https://issues.apache.org/jira/browse/FLINK-9603
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.5.0
>            Reporter: Rinat Sharipov
>            Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>  
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>  
> This problem is related with the following code:
> *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
> *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>  
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>       fs.exists(getPendingPathFor(partPath)) ||
>       fs.exists(getInProgressPathFor(partPath))) {
>    bucketState.partCounter++;
>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}***
> {code:java}
> if (partSuffix != null) {
>    partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to