EricJoy2048 opened a new issue, #4771:
URL: https://github.com/apache/incubator-seatunnel/issues/4771

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   ## Current state:
   - [ ] Discuss
   - [ ] Accepted
   - [ ] Rejected
   
   ## Motivation
   It is a common requirement to handle data (files or other content) that is 
already present in the target data source when a data synchronization job 
starts running. So we need to design a universal interface that is implemented 
by Sink connectors to handle data that already exists in the target data source.
   We will name this function the SaveMode function.
   
   ## Public Interfaces
   ### org.apache.seatunnel.api.sink.DataSaveMode
   DataSaveMode is an enumeration type that internally defines the complete set 
of SaveMode methods that SeaTunnel can support, and each Sink connector can 
only implement one or more of them.
   
   ```
   package org.apache.seatunnel.api.sink;
    
   /**
    * The SaveMode for the Sink connectors that use table or other table 
structures to organize data
    */
   public enum DataSaveMode {
       // Will drop table in MySQL, Will drop path for File Connector.
       DROP_SCHEMA,
    
       // Only drop the data in MySQL, Only drop the files in the path for File 
Connector.
       KEEP_SCHEMA_DROP_DATA,
    
       // Keep the table and data and continue to write data to the existing 
table for MySQL. Keep the
       // path and files in the path, create new files in the path.
       KEEP_SCHEMA_AND_DATA,
    
       // The connector provides custom processing methods, such as running 
user provided SQL or shell scripts, etc
       CUSTOM_PROCESSING,
    
       // Throw error when table is exists for MySQL. Throw error when path is 
exists.
       ERROR_WHEN_EXISTS
   }
   ```
   
   ### org.apache.seatunnel.api.sink.SupportDataSaveMode
   The SupportDataSaveMode interface defines an interface that Sink needs to 
implement if the SaveMode function is implemented.
   
   ```
   package org.apache.seatunnel.api.sink;
    
   /**
    * The Sink Connectors which support data SaveMode should implement this 
interface
    */
   public interface SupportDataSaveMode {
       /**
        * Return the value of DataSaveMode configured by user in the job config 
file.
        *
        * @return
        */
       DataSaveMode getUserConfigSaveMode();
    
       /**
        * The implementation of specific logic according to different {@link 
DataSaveMode}
        */
       void handleSaveMode(DataSaveMode userConfigSaveMode);
   }
   ```
   
   SaveMode should be processed before the job starts, so in the SeaTunnel Zeta 
engine, the handleSaveMode method of the connector can be called to process 
after Sink is built
   This code is in the MultipleTableJobConfigParser class
   
   ```
   private SinkAction<?, ?, ?, ?> createSinkAction(
               CatalogTable catalogTable,
               Map<TablePath, CatalogTable> sinkTableMap,
               Set<Action> inputActions,
               ReadonlyConfig readonlyConfig,
               ClassLoader classLoader,
               Set<URL> factoryUrls,
               String factoryId,
               int parallelism,
               int configIndex) {
           Optional<CatalogTable> insteadTable;
           if (sinkTableMap.size() == 1) {
               insteadTable = sinkTableMap.values().stream().findFirst();
           } else {
               // TODO: another table full name map
               insteadTable =
                       
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
           }
           if (insteadTable.isPresent()) {
               catalogTable = insteadTable.get();
           }
           SeaTunnelSink<?, ?, ?, ?> sink =
                   FactoryUtil.createAndPrepareSink(
                           catalogTable, readonlyConfig, classLoader, 
factoryId);
           sink.setJobContext(jobConfig.getJobContext());
           SinkConfig actionConfig =
                   new 
SinkConfig(catalogTable.getTableId().toTablePath().toString());
           long id = idGenerator.getNextId();
           String actionName =
                   JobConfigParser.createSinkActionName(
                           configIndex, factoryId, 
actionConfig.getMultipleRowTableId());
           SinkAction<?, ?, ?, ?> sinkAction =
                   new SinkAction<>(
                           id,
                           actionName,
                           new ArrayList<>(inputActions),
                           sink,
                           factoryUrls,
                           actionConfig);
           handleSaveMode(sink);
           sinkAction.setParallelism(parallelism);
           return sinkAction;
       }
    
       public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
           if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
               SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
               DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
               saveModeSink.handleSaveMode(dataSaveMode);
           }
       }
   ```
   
   The processing remains unchanged in Spark and Flink engines. The new 
interface has removed the checkOption method from SupportDataSaveMode, so the 
corresponding method calls also need to be deleted.
   
   ## Proposed Changes
   ### Delete the original logical code
   ```
   public class SinkCommonOptions {
    
       public static final String DATA_SAVE_MODE = "save_mode";
   }
   ```
   
   ### Original code modification
   #### org.apache.seatunnel.api.table.factory.FactoryUtil
   Because we defined the getDataSaveModeOption method in 
SupportDataSaveModeTableSinkFactory, theoretically all connectors that 
implement DataSaveMode will define their own parameters for the savemode in the 
factory. So there's no longer a need for it in FactoryUtil
   Special handling of the savemode parameter.
   
   ```
   /**
    * This method is called by SeaTunnel Web to get the full option rule of a 
sink.
    *
    * @return Option rule
    */
   public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory 
factory) {
       OptionRule sinkOptionRule = factory.optionRule();
       if (sinkOptionRule == null) {
           throw new FactoryException("sinkOptionRule can not be null");
       }
    
       try {
           TableSink sink = factory.createSink(null);
           if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
               SupportDataSaveMode supportDataSaveModeSink = 
(SupportDataSaveMode) sink;
               Option<DataSaveMode> saveMode =
                       Options.key(SinkCommonOptions.DATA_SAVE_MODE)
                               .singleChoice(
                                       DataSaveMode.class,
                                       
supportDataSaveModeSink.supportedDataSaveModeValues())
                               .noDefaultValue()
                               .withDescription("data save mode");
               OptionRule sinkCommonOptionRule = 
OptionRule.builder().required(saveMode).build();
               sinkOptionRule
                       .getOptionalOptions()
                       .addAll(sinkCommonOptionRule.getOptionalOptions());
           }
       } catch (Exception e) {
           LOG.warn(
                   "Add save mode option need sink connector support create 
sink by TableSinkFactory");
       }
    
       return sinkOptionRule;
   }
   ```
   
   Change to:
   
   ```
   public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory 
factory) {
       OptionRule sinkOptionRule = factory.optionRule();
       if (sinkOptionRule == null) {
           throw new FactoryException("sinkOptionRule can not be null");
       }
       return sinkOptionRule;
   }
   ```
   
   ## Compatibility
   The code logic is incompatible with changes, and existing connectors that 
support SaveMode need to be modified. Users did not have this feature before 
using it, so there are no compatibility issues.
   The following Sink Connectors needs to be modified:
   - 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
   
   - 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
   
   ## Test Plan
   ### Test coverage scenarios
   - The automatic table creation function of the above connectors can operate 
normally.
   
   ## Rejected Alternatives
   None
   
   ## Risk
   None
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to