[ 
https://issues.apache.org/jira/browse/FLINK-30807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682988#comment-17682988
 ] 

Yanfei Lei commented on FLINK-30807:
------------------------------------

Thanks for sharing your use case. I'm not sure if it's worth adding an option 
for this requirement, State Processor  module is relatively independent from 
flink. If the demand is urgent, maybe a local jar package can be compiled after 
modifying FileSystem.WriteMode.NO_OVERWRITE to  FileSystem.WriteMode.OVERWRITE.

Let's try to hear [~chesnay] ‘s  opinion.

> State Processor API - Overwrite Support for Savepoints
> ------------------------------------------------------
>
>                 Key: FLINK-30807
>                 URL: https://issues.apache.org/jira/browse/FLINK-30807
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / State Processor
>            Reporter: Rion Williams
>            Priority: Major
>
> Currently there is no overwrite support when using the State Processor API to 
> create a savepoint at a given location. For applications that may run or 
> generate a given savepoint on a periodic basis (e.g. cron job, nightly 
> process, etc.) this can result in an exception if the job was previously run.
> This ticket proposes amending the existing `SavePointWriter` class to support 
> passing the preferred overwrite mode as an optional parameter when writing 
> the savepoint similar to the example below:
> {code:java}
> SavepointWriter
>     .newSavepoint(env, new HashMapStateBackend(), maxParallelism)
>     .withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
>     .withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
>     .write(savepointPath, FileSystem.WriteMode.OVERWRITE); {code}
> This coincides with the underlying writer class which explicitly declares the 
> use of  `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` 
> class as seen below:
> {code:java}
> public final class FileCopyFunction implements OutputFormat<Path> {
>     ...
>     @Override
>     public void writeRecord(Path sourcePath) throws IOException {
>         Path destPath = new Path(path, sourcePath.getName());
>         try (FSDataOutputStream os =
>                         destPath.getFileSystem()
>                                 .create(destPath, 
> FileSystem.WriteMode.NO_OVERWRITE);
>                 FSDataInputStream is = 
> sourcePath.getFileSystem().open(sourcePath))
> {             IOUtils.copyBytes(is, os);         }
>     }
>     ...
> } {code}
> An alternative solution might be to explicitly check for the existence of the 
> file at the destination and deleting it, although the above seems much more 
> elegant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to