[
https://issues.apache.org/jira/browse/FLINK-30807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rion Williams updated FLINK-30807:
----------------------------------
Description:
Currently there is no overwrite support when using the State Processor API to
create a savepoint at a given location. For applications that may run or
generate a given savepoint on a periodic basis (e.g. cron job, nightly process,
etc.) this can result in an exception if the job was previously run.
This ticket proposes amending the existing `SavePointWriter` class to support
passing the preferred overwrite mode as an optional parameter when writing the
savepoint similar to the example below:
{code:java}
SavepointWriter
.newSavepoint(env, new HashMapStateBackend(), maxParallelism)
.withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
.withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
.write(savepointPath, FileSystem.WriteMode.OVERWRITE); {code}
This coincides with the underlying writer class which explicitly declares the
use of `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class
as seen below:
{code:java}
public final class FileCopyFunction implements OutputFormat<Path> {
...
@Override
public void writeRecord(Path sourcePath) throws IOException {
Path destPath = new Path(path, sourcePath.getName());
try (FSDataOutputStream os =
destPath.getFileSystem()
.create(destPath,
FileSystem.WriteMode.NO_OVERWRITE);
FSDataInputStream is =
sourcePath.getFileSystem().open(sourcePath))
{ IOUtils.copyBytes(is, os); }
}
...
} {code}
An alternative solution might be to explicitly check for the existence of the
file at the destination and deleting it, although the above seems much more
elegant.
was:
Currently there is no overwrite support when using the State Processor API to
create a savepoint at a given location. For applications that may run or
generate a given savepoint on a periodic basis (e.g. cron job, nightly process,
etc.) this can result in an exception if the job was previously run.
This ticket proposes amending the existing `SavePointWriter` class to support
passing the preferred overwrite mode as an optional parameter when writing the
savepoint similar to the example below:
```
```
This coincides with the underlying writer class which explicitly declares the
use of `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class
as seen below:
{code:java}
public final class FileCopyFunction implements OutputFormat<Path> {
...
@Override
public void writeRecord(Path sourcePath) throws IOException {
Path destPath = new Path(path, sourcePath.getName());
try (FSDataOutputStream os =
destPath.getFileSystem()
.create(destPath,
FileSystem.WriteMode.NO_OVERWRITE);
FSDataInputStream is =
sourcePath.getFileSystem().open(sourcePath))
{ IOUtils.copyBytes(is, os); }
}
...
} {code}
An alternative solution might be to explicitly check for the existence of the
file at the destination and deleting it, although the above seems much more
elegant.
> State Processor API - Overwrite Support for Savepoints
> ------------------------------------------------------
>
> Key: FLINK-30807
> URL: https://issues.apache.org/jira/browse/FLINK-30807
> Project: Flink
> Issue Type: New Feature
> Components: API / State Processor
> Reporter: Rion Williams
> Priority: Major
>
> Currently there is no overwrite support when using the State Processor API to
> create a savepoint at a given location. For applications that may run or
> generate a given savepoint on a periodic basis (e.g. cron job, nightly
> process, etc.) this can result in an exception if the job was previously run.
> This ticket proposes amending the existing `SavePointWriter` class to support
> passing the preferred overwrite mode as an optional parameter when writing
> the savepoint similar to the example below:
> {code:java}
> SavepointWriter
> .newSavepoint(env, new HashMapStateBackend(), maxParallelism)
> .withOperator(OperatorIdentifier.forUid("uid1"), transformation1)
> .withOperator(OperatorIdentifier.forUid("uid2"), transformation2)
> .write(savepointPath, FileSystem.WriteMode.OVERWRITE); {code}
> This coincides with the underlying writer class which explicitly declares the
> use of `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction`
> class as seen below:
> {code:java}
> public final class FileCopyFunction implements OutputFormat<Path> {
> ...
> @Override
> public void writeRecord(Path sourcePath) throws IOException {
> Path destPath = new Path(path, sourcePath.getName());
> try (FSDataOutputStream os =
> destPath.getFileSystem()
> .create(destPath,
> FileSystem.WriteMode.NO_OVERWRITE);
> FSDataInputStream is =
> sourcePath.getFileSystem().open(sourcePath))
> { IOUtils.copyBytes(is, os); }
> }
> ...
> } {code}
> An alternative solution might be to explicitly check for the existence of the
> file at the destination and deleting it, although the above seems much more
> elegant.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)