EricJoy2048 commented on issue #3824:
URL: 
https://github.com/apache/incubator-seatunnel/issues/3824#issuecomment-1373558379

   # Background
   
   Currently, SeaTunnel's sink connector does not support the SaveMode 
function. So I create this issue and discuss how to add SaveMode feature to 
SeaTunnel.
   
   ## Unified SaveMode Type
   
   I have checked all Sink connectors. At present, SaveMode can be unified into 
four modes.
   
   ```
   /**
    * The SaveMode for the Sink connectors that use table or other table 
structures to organize data
    */
   public enum DataSaveMode {
       // Will drop table in MySQL, Will drop path for File Connector.
       DROP_SCHEMA,
   
       // Only drop the data in MySQL, Only drop the files in the path for File 
Connector.
       KEEP_SCHEMA_DROP_DATA,
   
       // Keep the table and data and continue to write data to the existing 
table for MySQL. Keep the path and files in the path, create new files in the 
path.
       KEEP_SCHEMA_AND_DATA,
   
       // Throw error when table is exists for MySQL. Throw error when path is 
exists.
       ERROR_WHEN_EXISTS
   }
   ```
   
   ## Add interface for SaveMode
   
   I will add tow interface for SaveMode. 
   
   ### SupportDataSaveMode
   
   The Sink Connectors which support DataSaveMode should implement this 
interface
   
   ```
   /**
    * The Sink Connectors which support data SaveMode should implement this 
interface
    */
   public interface SupportDataSaveMode {
   
       /**
        * We hope every sink connector use the same option name to config 
SaveMode, So I add checkOptions method to this interface.
        * checkOptions method have a default implement to check whether 
`save_mode` parameter is in config.
        *
        * @param config config of Sink Connector
        * @return TableSaveMode TableSaveMode
        */
       default void checkOptions(Config config) {
           if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
               String tableSaveMode = 
config.getString(SinkCommonOptions.DATA_SAVE_MODE);
               DataSaveMode dataSaveMode = 
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
               if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
                   throw new 
SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                       "This connector don't support save mode: " + 
dataSaveMode);
               }
           } else {
               throw new 
SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                   SinkCommonOptions.DATA_SAVE_MODE + " must in config");
           }
       }
   
       DataSaveMode getDataSaveModeUsed();
   
       /**
        * Return the DataSaveMode list supported by this connector
        * @return
        */
       List<DataSaveMode> supportedDataSaveModeValues();
   
       void handleSaveMode(DataSaveMode tableSaveMode);
   }
   
   ```
   
   ## Add SaveMode Option to Sink Connector
   
   In order to unify the parameter names of each Sink connector that supports 
SaveMode, I added a parameters about SaveMode.
   
   ```
   public static final String DATA_SAVE_MODE = "save_mode"
   ```
   
   Add a `SingleChoiceOption` to let the connector set the `DataSaveMode` list 
they supported. The method related to the construction of this Option will not 
be described in detail here.
   
   ```
   public class SingleChoiceOption<T> extends Option{
   
       @Getter
       private final List<T> optionValues;
   
       public SingleChoiceOption(String key,
                                 TypeReference<T> typeReference,
                                 List<T> optionValues,
                                 T defaultValue) {
           super(key, typeReference, defaultValue);
           this.optionValues = optionValues;
       }
   }
   ```
   
   ## Automatically add DataSaveMode Option to the OptionRule of each connector 
supported DataSaveMode
   
   To do this, I add `SupportDataSaveMode` and `SupportDataSaveMode` interface 
check in `FactoryUtil.sinkFullOptionRule`. 
   
   Please note that the `createSink` method of the `TableSinkFactory` interface 
is enabled here, So the connector which implement `SupportDataSaveMode` must 
implement `TableSinkFactory#createSink`.
   
   ```
   /**
        * This method is called by SeaTunnel Web to get the full option rule of 
a sink.
        * @return
        */
       public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory 
factory) {
           OptionRule sinkOptionRule = factory.optionRule();
           if (sinkOptionRule == null) {
               throw new FactoryException("sinkOptionRule can not be null");
           }
   
           try {
               TableSink sink = factory.createSink(null);
               if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) 
{
                   SupportDataSaveMode supportDataSaveModeSink = 
(SupportDataSaveMode) sink;
                   Option<DataSaveMode> saveMode =
                       Options.key(SinkCommonOptions.DATA_SAVE_MODE)
                           .singleChoice(DataSaveMode.class, 
supportDataSaveModeSink.supportedDataSaveModeValues())
                           .noDefaultValue()
                           .withDescription("data save mode");
                   OptionRule sinkCommonOptionRule =
                       OptionRule.builder().required(saveMode).build();
                   
sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
               }
           } catch (UnsupportedOperationException e) {
               LOG.warn("Add save mode option need sink connector support 
create sink by TableSinkFactory");
           }
   
           return sinkOptionRule;
       }
   ```
   
   ## What stage should DataSaveMode be processed
   
   We will call `checkOptions` in `SinkExecuteProcessor` to check the config.
   
   So I updated the starter code.
   
   
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins
   
   ```
       @Override
       protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config> 
pluginConfigs) {
           SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
           List<URL> pluginJars = new ArrayList<>();
           List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
               PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
sinkConfig.getString(PLUGIN_NAME));
               
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
               SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
                   sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
               seaTunnelSink.prepare(sinkConfig);
               seaTunnelSink.setJobContext(jobContext);
               if 
(seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
                   SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
                   saveModeSink.checkOptions(sinkConfig);
               }
               return seaTunnelSink;
           }).distinct().collect(Collectors.toList());
           jarPaths.addAll(pluginJars);
           return sinks;
       }
   ```
   
   ## About handleSaveMode method and automatic create table
   
   @TyrantLucifer  Will design it.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to