EricJoy2048 opened a new issue, #4771: URL: https://github.com/apache/incubator-seatunnel/issues/4771
### Search before asking - [X] I had searched in the [feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ## Current state: - [ ] Discuss - [ ] Accepted - [ ] Rejected ## Motivation It is a common requirement to handle data (files or other content) that is already present in the target data source when a data synchronization job starts running. So we need to design a universal interface that is implemented by Sink connectors to handle data that already exists in the target data source. We will name this function the SaveMode function. ## Public Interfaces ### org.apache.seatunnel.api.sink.DataSaveMode DataSaveMode is an enumeration type that internally defines the complete set of SaveMode methods that SeaTunnel can support, and each Sink connector can only implement one or more of them. ``` package org.apache.seatunnel.api.sink; /** * The SaveMode for the Sink connectors that use table or other table structures to organize data */ public enum DataSaveMode { // Will drop table in MySQL, Will drop path for File Connector. DROP_SCHEMA, // Only drop the data in MySQL, Only drop the files in the path for File Connector. KEEP_SCHEMA_DROP_DATA, // Keep the table and data and continue to write data to the existing table for MySQL. Keep the // path and files in the path, create new files in the path. KEEP_SCHEMA_AND_DATA, // The connector provides custom processing methods, such as running user provided SQL or shell scripts, etc CUSTOM_PROCESSING, // Throw error when table is exists for MySQL. Throw error when path is exists. ERROR_WHEN_EXISTS } ``` ### org.apache.seatunnel.api.sink.SupportDataSaveMode The SupportDataSaveMode interface defines an interface that Sink needs to implement if the SaveMode function is implemented. ``` package org.apache.seatunnel.api.sink; /** * The Sink Connectors which support data SaveMode should implement this interface */ public interface SupportDataSaveMode { /** * Return the value of DataSaveMode configured by user in the job config file. * * @return */ DataSaveMode getUserConfigSaveMode(); /** * The implementation of specific logic according to different {@link DataSaveMode} */ void handleSaveMode(DataSaveMode userConfigSaveMode); } ``` SaveMode should be processed before the job starts, so in the SeaTunnel Zeta engine, the handleSaveMode method of the connector can be called to process after Sink is built This code is in the MultipleTableJobConfigParser class ``` private SinkAction<?, ?, ?, ?> createSinkAction( CatalogTable catalogTable, Map<TablePath, CatalogTable> sinkTableMap, Set<Action> inputActions, ReadonlyConfig readonlyConfig, ClassLoader classLoader, Set<URL> factoryUrls, String factoryId, int parallelism, int configIndex) { Optional<CatalogTable> insteadTable; if (sinkTableMap.size() == 1) { insteadTable = sinkTableMap.values().stream().findFirst(); } else { // TODO: another table full name map insteadTable = Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath())); } if (insteadTable.isPresent()) { catalogTable = insteadTable.get(); } SeaTunnelSink<?, ?, ?, ?> sink = FactoryUtil.createAndPrepareSink( catalogTable, readonlyConfig, classLoader, factoryId); sink.setJobContext(jobConfig.getJobContext()); SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath().toString()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createSinkActionName( configIndex, factoryId, actionConfig.getMultipleRowTableId()); SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>( id, actionName, new ArrayList<>(inputActions), sink, factoryUrls, actionConfig); handleSaveMode(sink); sinkAction.setParallelism(parallelism); return sinkAction; } public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink; DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); saveModeSink.handleSaveMode(dataSaveMode); } } ``` The processing remains unchanged in Spark and Flink engines. The new interface has removed the checkOption method from SupportDataSaveMode, so the corresponding method calls also need to be deleted. ## Proposed Changes ### Delete the original logical code ``` public class SinkCommonOptions { public static final String DATA_SAVE_MODE = "save_mode"; } ``` ### Original code modification #### org.apache.seatunnel.api.table.factory.FactoryUtil Because we defined the getDataSaveModeOption method in SupportDataSaveModeTableSinkFactory, theoretically all connectors that implement DataSaveMode will define their own parameters for the savemode in the factory. So there's no longer a need for it in FactoryUtil Special handling of the savemode parameter. ``` /** * This method is called by SeaTunnel Web to get the full option rule of a sink. * * @return Option rule */ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { OptionRule sinkOptionRule = factory.optionRule(); if (sinkOptionRule == null) { throw new FactoryException("sinkOptionRule can not be null"); } try { TableSink sink = factory.createSink(null); if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) { SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink; Option<DataSaveMode> saveMode = Options.key(SinkCommonOptions.DATA_SAVE_MODE) .singleChoice( DataSaveMode.class, supportDataSaveModeSink.supportedDataSaveModeValues()) .noDefaultValue() .withDescription("data save mode"); OptionRule sinkCommonOptionRule = OptionRule.builder().required(saveMode).build(); sinkOptionRule .getOptionalOptions() .addAll(sinkCommonOptionRule.getOptionalOptions()); } } catch (Exception e) { LOG.warn( "Add save mode option need sink connector support create sink by TableSinkFactory"); } return sinkOptionRule; } ``` Change to: ``` public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { OptionRule sinkOptionRule = factory.optionRule(); if (sinkOptionRule == null) { throw new FactoryException("sinkOptionRule can not be null"); } return sinkOptionRule; } ``` ## Compatibility The code logic is incompatible with changes, and existing connectors that support SaveMode need to be modified. Users did not have this feature before using it, so there are no compatibility issues. The following Sink Connectors needs to be modified: - seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java - seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java ## Test Plan ### Test coverage scenarios - The automatic table creation function of the above connectors can operate normally. ## Rejected Alternatives None ## Risk None ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
