Hisoka-X opened a new issue, #4005:
URL: https://github.com/apache/incubator-seatunnel/issues/4005

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   ## Status quo
   In order to realize the automatic table creation function in SaveMode, the 
Sink side needs to receive the type information of the data passed from the 
upstream, and then build its own table
   ## Question
   Taking Clickhouse as an example, if we want to automatically create a data 
table, before our program reads and writes data, the corresponding data table 
is generated according to the upstream data structure.
   The upstream type structure is:
   ```java
   id int,
   name String,
   age int
   ```
   The table creation statement we expect to be executed is:
   ```sql
   CREATE TABLE user (
        idUInt32,
        name String DEFAULT 'Unknown',
        age UInt32
   ) ENGINE = MergeTree()
   PRIMARY KEY id
   ```
   - Question 1:  
   How do we know that the primary key is id and not name?
   - Question 2:
   Even if we know that the primary key is id, how can we let the program know 
that the primary key is id, and then help us automatically create this table?
   - Question 3:
   What if I want to use ReplacingMergeTree instead of MergeTree?
   - Question 4:
   Where can I find out the default value?
   ## Solution
   It can be seen that if you want to get a complete and usable table creation 
statement, you can divide the information source into two parts. Information at 
the field level, and information at the table level.
   Field-level information includes the field name, field type, field default 
value, and whether the field is a primary key. The information at the table 
level includes the table name, the engine type of the table, and other 
characteristic identifications belonging to different data sources.
   So the current data structure has no way to meet our needs, we need to make 
some changes to the original data structure:
   ```java
   // The Code Segment
   public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
        private static final long serialVersionUID = 2L;
   
        /**
         * The field name of the {@link SeaTunnelRow}.
         */
        private final String[] fieldNames;
        /**
         * The type of the field.
         */
        private final SeaTunnelDataType<?>[] fieldTypes;
   }
   ```
   becomes:
   ```java
   // The Code Segment
   public class SeaTunnelRowType implements CompositeType<SeaTunnelRow> {
        private static final long serialVersionUID = 2L;
   
        /**
         * The field name of the {@link SeaTunnelRow}.
         */
        private final String[] fieldNames;
        /**
         * The columns of the {@link SeaTunnelRow}.
         */
        private final SeaTunnelColumn<?>[] columns;
   }
   
   public class SeaTunnelColumn<T> implements Serializable {
   
        private SeaTunnelDataType<T> fieldType;
   
        private String fieldName;
   
        private boolean isPrimaryKey = false;
   
        private boolean isNullable = true;
   
        private T defaultValue;
   
   }
   
   ```
   Save the field information by adding the SeaTunnelColumn type, and then get 
the information in the Sink Connector. Now, our problems 2 and 4 are solved, 
and we will solve problem 1 next.
   Usually, we need our Source to define the SeaTunnelRowType. Now we need to 
define the internal SeaTunnelColumn information while defining the 
SeaTunnelRowType to adapt to read the structural information of the Source. If 
you need to modify the SeaTunnelColumn definition of the field in the data 
processing process, you can use the Transform of FieldMapper to achieve it (the 
Transform needs to be adapted and modified).
   Next, we use templates to solve problem 3.
   Since different sinks have different syntax for creating tables, we take SQL 
as an example. Similar ElasticSearch Index creation and Hbase Table creation 
can also be created through this method. The template is an option. If the user 
does not configure it, the default template will be used to create the table, 
reducing the complexity of the user's configuration.
   Add a template field in Sink's Config, such as:
   ```javascript
   sink {
        clickhouse {
            host = "clickhouse:8123"
            database = "default"
            table = "sink_table"
            username = "default"
            password = ""
           
            save_mode = "DROP_SCHEMA"
            save_mode_create_template = ```
                                            CREATE TABLE user (
                                                ${rowtype_fields}
                                            ) ENGINE = MergeTree()
                                            PRIMARY KEY ${rowtype_primary_key}
                                        ```
        }
   }
   ```
   In this way, our Clickhouse Sink Connector can fill `rowtype_fields` and 
`rowtype_primary_key` according to the obtained SeaTunnelRowType. It can be 
seen that the save_mode_create_template we provide is a set of standards, 
rather than filling in the logic of the framework layer.
   By default, we can choose not to fill in `save_mode_create_template`, when 
we need to modify the engine, or modify other creation specific values, we can 
choose to fill in `save_mode_create_template`, such as modifying the engine to 
ReplacingMergeTree:
   ```javascript
   sink {
        clickhouse {
            host = "clickhouse:8123"
            database = "default"
            table = "sink_table"
            username = "default"
            password = ""
           
            save_mode = "DROP_SCHEMA"
            save_mode_create_template = ```
                                            CREATE TABLE user (
                                                ${rowtype_fields}
                                            ) ENGINE = ReplacingMergeTree()
                                            PRIMARY KEY ${rowtype_primary_key}
                                        ```
        }
   }
   ```
   
   ## Disadvantages
   The current RowType analysis of Spark/Flink will convert the RowType 
processed by the engine analysis generation operator, and then generate a new 
SeaTunnelRowType, and the SeaTunnelRowType content will be lost. This part of 
the logic needs to be modified to skip the RowType generated by the engine. 
Directly use the SeaTunnelRowType generated inside SeaTunnel. In this case, 
Transform-V1 cannot be supported, but we will abandon V1 in the next version, 
which has no effect.
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to