EricJoy2048 commented on issue #3824:
URL:
https://github.com/apache/incubator-seatunnel/issues/3824#issuecomment-1373558379
# Background
Currently, SeaTunnel's sink connector does not support the SaveMode
function. So I create this issue and discuss how to add SaveMode feature to
SeaTunnel.
## Unified SaveMode Type
I have checked all Sink connectors. At present, SaveMode can be unified into
four modes.
```
/**
* The SaveMode for the Sink connectors that use table or other table
structures to organize data
*/
public enum DataSaveMode {
// Will drop table in MySQL, Will drop path for File Connector.
DROP_SCHEMA,
// Only drop the data in MySQL, Only drop the files in the path for File
Connector.
KEEP_SCHEMA_DROP_DATA,
// Keep the table and data and continue to write data to the existing
table for MySQL. Keep the path and files in the path, create new files in the
path.
KEEP_SCHEMA_AND_DATA,
// Throw error when table is exists for MySQL. Throw error when path is
exists.
ERROR_WHEN_EXISTS
}
```
## Add interface for SaveMode
I will add tow interface for SaveMode.
### SupportDataSaveMode
The Sink Connectors which support DataSaveMode should implement this
interface
```
/**
* The Sink Connectors which support data SaveMode should implement this
interface
*/
public interface SupportDataSaveMode {
/**
* We hope every sink connector use the same option name to config
SaveMode, So I add checkOptions method to this interface.
* checkOptions method have a default implement to check whether
`save_mode` parameter is in config.
*
* @param config config of Sink Connector
* @return TableSaveMode TableSaveMode
*/
default void checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
String tableSaveMode =
config.getString(SinkCommonOptions.DATA_SAVE_MODE);
DataSaveMode dataSaveMode =
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
throw new
SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"This connector don't support save mode: " +
dataSaveMode);
}
} else {
throw new
SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
SinkCommonOptions.DATA_SAVE_MODE + " must in config");
}
}
DataSaveMode getDataSaveModeUsed();
/**
* Return the DataSaveMode list supported by this connector
* @return
*/
List<DataSaveMode> supportedDataSaveModeValues();
void handleSaveMode(DataSaveMode tableSaveMode);
}
```
## Add SaveMode Option to Sink Connector
In order to unify the parameter names of each Sink connector that supports
SaveMode, I added a parameters about SaveMode.
```
public static final String DATA_SAVE_MODE = "save_mode"
```
Add a `SingleChoiceOption` to let the connector set the `DataSaveMode` list
they supported. The method related to the construction of this Option will not
be described in detail here.
```
public class SingleChoiceOption<T> extends Option{
@Getter
private final List<T> optionValues;
public SingleChoiceOption(String key,
TypeReference<T> typeReference,
List<T> optionValues,
T defaultValue) {
super(key, typeReference, defaultValue);
this.optionValues = optionValues;
}
}
```
## Automatically add DataSaveMode Option to the OptionRule of each connector
supported DataSaveMode
To do this, I add `SupportDataSaveMode` and `SupportDataSaveMode` interface
check in `FactoryUtil.sinkFullOptionRule`.
Please note that the `createSink` method of the `TableSinkFactory` interface
is enabled here, So the connector which implement `SupportDataSaveMode` must
implement `TableSinkFactory#createSink`.
```
/**
* This method is called by SeaTunnel Web to get the full option rule of
a sink.
* @return
*/
public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory
factory) {
OptionRule sinkOptionRule = factory.optionRule();
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}
try {
TableSink sink = factory.createSink(null);
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass()))
{
SupportDataSaveMode supportDataSaveModeSink =
(SupportDataSaveMode) sink;
Option<DataSaveMode> saveMode =
Options.key(SinkCommonOptions.DATA_SAVE_MODE)
.singleChoice(DataSaveMode.class,
supportDataSaveModeSink.supportedDataSaveModeValues())
.noDefaultValue()
.withDescription("data save mode");
OptionRule sinkCommonOptionRule =
OptionRule.builder().required(saveMode).build();
sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
}
} catch (UnsupportedOperationException e) {
LOG.warn("Add save mode option need sink connector support
create sink by TableSinkFactory");
}
return sinkOptionRule;
}
```
## What stage should DataSaveMode be processed
We will call `checkOptions` in `SinkExecuteProcessor` to check the config.
So I updated the starter code.
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins
```
@Override
protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config>
pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
if
(seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
saveModeSink.checkOptions(sinkConfig);
}
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
jarPaths.addAll(pluginJars);
return sinks;
}
```
## About handleSaveMode method and automatic create table
@TyrantLucifer Will design it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]