CalvinKirs opened a new pull request, #2391:
URL: https://github.com/apache/incubator-seatunnel/pull/2391
Support local file storage plugin
Support proto-stuff serillazer
Support SPI load check-point storage
CheckPoint storage is a very important part of the SeaTunnel engine
CheckPoint. This Issue mainly discusses the design and implementation here.
The `check-point-storage-api` module defines the API for storage plugins and
some common classes,
We use SPI to load storage plugins. All sub-plugins need to implement
CheckPointStorageFactory. In fact, implementing a plugin only pays attention to
this class.
````
public interface CheckPointStorageFactory {
/**
* Returns the name of the storage plugin
*/
String name();
/**
* create storage plugin instance
*
* @param configuration storage system config params
* key: storage system config key
* value: storage system config value
*e.g.
* key: "FS_DEFAULT_NAME_KEY"
* value: "fs.defaultFS"
* return storage plugin instance
*/
CheckPointStorage create(Map<String, String> configuration);
}
````
We implement local file storage by default, see the
`check-point-storage-local-file` module for details.
CheckPointStorage is a specific plug-in instance that needs to implement the
following methods.
````
public interface CheckPointStorage {
/**
* init storage and create parent directory if not exists
*
* @param configuration configuration storage system config params
* @throws CheckPointStorageException if init failed
*/
void initStorage(Map<String, String> configuration) throws
CheckPointStorageException;
/**
* save checkpoint to storage
*
* @param state PipelineState
* @throws CheckPointStorageException if save checkpoint failed
*/
String storeCheckPoint(PipelineState state) throws
CheckPointStorageException;
/**
* get all checkpoints from storage
*
* @param jobId job id
* @return All job's checkpoint data from storage
* @throws CheckPointStorageException if get checkpoint failed
*/
List<PipelineState> getAllCheckpoints(String jobId);
/**
* get latest checkpoint from storage
*
* @param jobId job id
* @return latest checkpoint data from storage
* @throws CheckPointStorageException if get checkpoint failed
*/
PipelineState getLatestCheckpoint(String jobId) throws
CheckPointStorageException;
/**
* get checkpoint from storage, If there are multiple records, one will
be returned randomly
*
* @param jobId job id
* @param pipelineId pipeline id
* @return checkpoint data from storage
* @throws CheckPointStorageException if get checkpoint failed or no
checkpoint found
*/
PipelineState getCheckpointByJobIdAndPipelineId(String jobId, String
pipelineId) throws CheckPointStorageException;
/**
* Delete all checkpoint data under the job
*
* @param jobId job id
* @throws CheckPointStorageException if delete checkpoint failed
*/
void deleteCheckpoint(String jobId);
````
`org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckPointStorage`
Contains some abstract methods and common methods, such as serialization and
deserialization, file naming, etc.
We use `proto-stuff` serialization and deserialization by default. All
stored file information is serialized data, and will be deserialized when data
needs to be obtained. If you need to change other ways, it can also be
implemented quickly.
Remaining todo:
support asynchronous,
Support hdfs plugin
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]