[ 
https://issues.apache.org/jira/browse/FLINK-30807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rion Williams updated FLINK-30807:
----------------------------------
    Description: 
Currently there is no overwrite support when using the State Processor API to 
create a savepoint at a given location. For applications that may run or 
generate a given savepoint on a periodic basis (e.g. cron job, nightly process, 
etc.) this can result in an exception if the job was previously run.

This ticket proposes amending the existing `SavePointWriter` class to support 
passing the preferred overwrite mode as an optional parameter when writing the 
savepoint similar to the example below:
{code:java}
SavepointWriter
    .newSavepoint(env, new HashMapStateBackend(), maxParallelism)
    .withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
    .withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
    .write(savepointPath, FileSystem.WriteMode.OVERWRITE); {code}
This coincides with the underlying writer class which explicitly declares the 
use of  `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class 
as seen below:
{code:java}
public final class FileCopyFunction implements OutputFormat<Path> {
    ...
    @Override
    public void writeRecord(Path sourcePath) throws IOException {
        Path destPath = new Path(path, sourcePath.getName());
        try (FSDataOutputStream os =
                        destPath.getFileSystem()
                                .create(destPath, 
FileSystem.WriteMode.NO_OVERWRITE);
                FSDataInputStream is = 
sourcePath.getFileSystem().open(sourcePath))
{             IOUtils.copyBytes(is, os);         }
    }
    ...
} {code}
An alternative solution might be to explicitly check for the existence of the 
file at the destination and deleting it, although the above seems much more 
elegant.

  was:
Currently there is no overwrite support when using the State Processor API to 
create a savepoint at a given location. For applications that may run or 
generate a given savepoint on a periodic basis (e.g. cron job, nightly process, 
etc.) this can result in an exception if the job was previously run.

This ticket proposes amending the existing `SavePointWriter` class to support 
passing the preferred overwrite mode as an optional parameter when writing the 
savepoint similar to the example below:

```

```

This coincides with the underlying writer class which explicitly declares the 
use of  `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class 
as seen below:
{code:java}
public final class FileCopyFunction implements OutputFormat<Path> {
    ...
    @Override
    public void writeRecord(Path sourcePath) throws IOException {
        Path destPath = new Path(path, sourcePath.getName());
        try (FSDataOutputStream os =
                        destPath.getFileSystem()
                                .create(destPath, 
FileSystem.WriteMode.NO_OVERWRITE);
                FSDataInputStream is = 
sourcePath.getFileSystem().open(sourcePath))
{             IOUtils.copyBytes(is, os);         }
    }
    ...
} {code}
An alternative solution might be to explicitly check for the existence of the 
file at the destination and deleting it, although the above seems much more 
elegant.


> State Processor API - Overwrite Support for Savepoints
> ------------------------------------------------------
>
>                 Key: FLINK-30807
>                 URL: https://issues.apache.org/jira/browse/FLINK-30807
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / State Processor
>            Reporter: Rion Williams
>            Priority: Major
>
> Currently there is no overwrite support when using the State Processor API to 
> create a savepoint at a given location. For applications that may run or 
> generate a given savepoint on a periodic basis (e.g. cron job, nightly 
> process, etc.) this can result in an exception if the job was previously run.
> This ticket proposes amending the existing `SavePointWriter` class to support 
> passing the preferred overwrite mode as an optional parameter when writing 
> the savepoint similar to the example below:
> {code:java}
> SavepointWriter
>     .newSavepoint(env, new HashMapStateBackend(), maxParallelism)
>     .withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
>     .withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
>     .write(savepointPath, FileSystem.WriteMode.OVERWRITE); {code}
> This coincides with the underlying writer class which explicitly declares the 
> use of  `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` 
> class as seen below:
> {code:java}
> public final class FileCopyFunction implements OutputFormat<Path> {
>     ...
>     @Override
>     public void writeRecord(Path sourcePath) throws IOException {
>         Path destPath = new Path(path, sourcePath.getName());
>         try (FSDataOutputStream os =
>                         destPath.getFileSystem()
>                                 .create(destPath, 
> FileSystem.WriteMode.NO_OVERWRITE);
>                 FSDataInputStream is = 
> sourcePath.getFileSystem().open(sourcePath))
> {             IOUtils.copyBytes(is, os);         }
>     }
>     ...
> } {code}
> An alternative solution might be to explicitly check for the existence of the 
> file at the destination and deleting it, although the above seems much more 
> elegant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to