yzeng1618 commented on code in PR #9743:
URL: https://github.com/apache/seatunnel/pull/9743#discussion_r2329185061


##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveFormatUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final String createTemplate;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+    private final List<String> sourceFieldNames;
+    private final List<String> partitionFieldsFromSource;
+    private final List<String> nonPartitionFields;
+
+    private HiveMetaStoreProxy hiveMetaStoreProxy;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode,
+            String createTemplate) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.createTemplate = createTemplate;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields and validation
+        this.partitionFields = 
readonlyConfig.get(HiveSinkOptions.PARTITION_FIELDS);
+        this.sourceFieldNames =
+                tableSchema.getColumns().stream()
+                        
.map(org.apache.seatunnel.api.table.catalog.Column::getName)
+                        .collect(Collectors.toList());
+
+        // Validate and categorize partition fields
+        validatePartitionFields();
+        this.partitionFieldsFromSource =
+                partitionFields.stream()
+                        .filter(sourceFieldNames::contains)
+                        .collect(Collectors.toList());
+        this.nonPartitionFields =
+                sourceFieldNames.stream()
+                        .filter(field -> 
!partitionFieldsFromSource.contains(field))
+                        .collect(Collectors.toList());
+    }
+
+    @Override
+    public void open() {
+        this.hiveMetaStoreProxy = 
HiveMetaStoreProxy.getInstance(readonlyConfig);
+    }
+
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        // For Hive, we use the same logic as handleSchemaSaveMode
+        handleSchemaSaveMode();
+    }
+
+    @Override
+    public TablePath getHandleTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        // Hive doesn't use Catalog interface directly, return null
+        return null;
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return schemaSaveMode;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        // Hive uses OVERWRITE parameter for data handling
+        return DataSaveMode.APPEND_DATA;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (hiveMetaStoreProxy != null) {
+            hiveMetaStoreProxy.close();
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        try {
+            switch (schemaSaveMode) {
+                case RECREATE_SCHEMA:
+                    handleRecreateSchema();
+                    break;
+                case CREATE_SCHEMA_WHEN_NOT_EXIST:
+                    handleCreateSchemaWhenNotExist();
+                    break;
+                case ERROR_WHEN_SCHEMA_NOT_EXIST:
+                    handleErrorWhenSchemaNotExist();
+                    break;
+                case IGNORE:
+                    log.info(
+                            "Ignore schema save mode, skip schema handling for 
table {}.{}",
+                            dbName,
+                            tableName);
+                    break;
+                default:
+                    throw new HiveConnectorException(
+                            HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                            "Unsupported schema save mode: " + schemaSaveMode);
+            }
+        } catch (Exception e) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Failed to handle schema save mode: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void handleDataSaveMode() {
+        // For Hive, data save mode is handled by the existing OVERWRITE 
parameter
+        // No additional data handling is needed here
+        log.info(
+                "Data save mode handling is managed by existing OVERWRITE 
parameter for table {}.{}",
+                dbName,
+                tableName);
+    }
+
+    private void handleRecreateSchema() throws TException {
+        log.info("Recreate schema mode: dropping and recreating table {}.{}", 
dbName, tableName);
+
+        // Create database if not exists
+        createDatabaseIfNotExists();
+
+        // Drop table if exists
+        if (hiveMetaStoreProxy.tableExists(dbName, tableName)) {
+            hiveMetaStoreProxy.dropTable(dbName, tableName);
+            log.info("Dropped existing table {}.{}", dbName, tableName);
+        }
+
+        // Create table
+        createTable();
+    }
+
+    private void handleCreateSchemaWhenNotExist() throws TException {
+        log.info("Create schema when not exist mode for table {}.{}", dbName, 
tableName);
+
+        // Create database if not exists
+        createDatabaseIfNotExists();
+
+        // Create table if not exists
+        if (!hiveMetaStoreProxy.tableExists(dbName, tableName)) {
+            createTable();
+        }
+    }
+
+    private void handleErrorWhenSchemaNotExist() throws TException {
+
+        // Check if database exists
+        if (!hiveMetaStoreProxy.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist");
+        }
+
+        if (!hiveMetaStoreProxy.tableExists(dbName, tableName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Table " + dbName + "." + tableName + " does not exist");
+        }
+    }
+
+    private void createDatabaseIfNotExists() throws TException {
+        hiveMetaStoreProxy.createDatabaseIfNotExists(dbName);
+    }
+
+    private void createTable() throws TException {
+        String defaultTemplate = 
HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+        boolean useCustomTemplate = !defaultTemplate.equals(createTemplate);
+
+        if (useCustomTemplate) {
+            createTableUsingTemplate();
+        } else {
+            createTableUsingAPI();
+        }
+    }
+
+    private void createTableUsingAPI() throws TException {
+        // Create table using Hive MetaStore API (more reliable than SQL)
+        Table table = buildTableFromSchema();
+        hiveMetaStoreProxy.createTableIfNotExists(table);
+    }
+
+    private void createTableUsingTemplate() throws TException {
+        processCreateTemplate();

Review Comment:
   Previously, table creation was done directly by calling the API. This 
approach has now been removed; instead, tables are now created directly using 
templates or custom templates.



##########
seatunnel-connectors-v2/connector-hive/pom.xml:
##########
@@ -82,32 +83,30 @@
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
             <version>${hive.exec.version}</version>
-            <scope>provided</scope>
             <exclusions>
+                <!-- Exclude logging dependencies to avoid conflicts -->
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-1.2-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-slf4j-impl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-web</artifactId>
+                    <artifactId>*</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <!-- Exclude format dependencies to avoid version conflicts -->
                 <exclusion>
                     <groupId>org.apache.parquet</groupId>
-                    <artifactId>parquet-hadoop-bundle</artifactId>
+                    <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <!-- Exclude unnecessary dependencies -->

Review Comment:
   The relevant dependencies have been removed at present.



##########
docs/en/connector-v2/sink/Hive.md:
##########
@@ -100,6 +100,22 @@ Support writing Parquet INT96 from a timestamp, only valid 
for parquet files.
 
 Flag to decide whether to use overwrite mode when inserting data into Hive. If 
set to true, for non-partitioned tables, the existing data in the table will be 
deleted before inserting new data. For partitioned tables, the data in the 
relevant partition will be deleted before inserting new data.
 
+### schema_save_mode [enum]

Review Comment:
   Has been added,**Default value**: `CREATE_SCHEMA_WHEN_NOT_EXIST`



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -55,6 +55,7 @@ public class HiveSaveModeHandler implements SaveModeHandler, 
AutoCloseable {
     private final List<String> partitionFields;
 
     private HiveMetaStoreProxy hiveMetaStoreProxy;
+    private Catalog optionalCatalog; // 可选的Catalog支持

Review Comment:
   Has been modified



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/auto_table_creation/fake_to_hive_create_when_not_exist.conf:
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint

Review Comment:
   Okay, I will add these full-type detection unit tests and E2E tests.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields from template if available
+        this.partitionFields = extractPartitionFieldsFromConfig();
+    }
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode,
+            Catalog catalog) {
+        this(readonlyConfig, catalogTable, schemaSaveMode);
+        this.optionalCatalog = catalog;
+    }
+
+    @Override
+    public void open() {
+        this.hiveCatalog = HiveMetaStoreCatalog.create(readonlyConfig);
+        if (this.optionalCatalog == null) {
+            this.optionalCatalog = this.hiveCatalog;
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        // For Hive, we use the same logic as handleSchemaSaveMode
+        handleSchemaSaveMode();
+    }
+
+    @Override
+    public TablePath getHandleTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        return optionalCatalog;
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return schemaSaveMode;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        // Hive uses OVERWRITE parameter for data handling
+        return DataSaveMode.APPEND_DATA;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (optionalCatalog != null) {
+            optionalCatalog.close();
+        }
+        if (hiveCatalog != null && hiveCatalog != optionalCatalog) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        try {
+            switch (schemaSaveMode) {
+                case RECREATE_SCHEMA:
+                    handleRecreateSchema();
+                    break;
+                case CREATE_SCHEMA_WHEN_NOT_EXIST:
+                    handleCreateSchemaWhenNotExist();
+                    break;
+                case ERROR_WHEN_SCHEMA_NOT_EXIST:
+                    handleErrorWhenSchemaNotExist();
+                    break;
+                case IGNORE:
+                    log.info(
+                            "Ignore schema save mode, skip schema handling for 
table {}.{}",
+                            dbName,
+                            tableName);
+                    break;
+                default:
+                    throw new HiveConnectorException(
+                            HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                            "Unsupported schema save mode: " + schemaSaveMode);
+            }
+        } catch (Exception e) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Failed to handle schema save mode: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void handleDataSaveMode() {
+        // For Hive, data save mode is handled by the existing OVERWRITE 
parameter
+        // No additional data handling is needed here
+        log.info(
+                "Data save mode handling is managed by existing OVERWRITE 
parameter for table {}.{}",
+                dbName,
+                tableName);
+    }
+
+    private void handleRecreateSchema() throws TException {
+        log.info("Recreate schema mode: dropping and recreating table {}.{}", 
dbName, tableName);
+
+        // Do NOT create database automatically. Ensure database exists first.
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist. Please create it 
manually.");
+        }
+
+        // Drop table if exists
+        if (hiveCatalog.tableExists(dbName, tableName)) {
+            hiveCatalog.dropTable(dbName, tableName);
+            log.info("Dropped existing table {}.{}", dbName, tableName);
+        }
+
+        // Create table using template
+        createTable();
+    }
+
+    private void handleCreateSchemaWhenNotExist() throws TException {
+        log.info("Create schema when not exist mode for table {}.{}", dbName, 
tableName);
+
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist. Please create it 
manually.");
+        }
+
+        if (!hiveCatalog.tableExists(dbName, tableName)) {
+            createTable();
+        } else {
+            log.info("Table {}.{} already exists, skipping creation", dbName, 
tableName);
+        }
+    }
+
+    private void handleErrorWhenSchemaNotExist() throws TException {
+        log.info("Error when schema not exist mode: checking table {}.{}", 
dbName, tableName);
+
+        // Check if database exists
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist");
+        }
+
+        // Check if table exists
+        if (!hiveCatalog.tableExists(dbName, tableName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Table " + dbName + "." + tableName + " does not exist");
+        }
+    }
+
+    private void createTable() throws TException {
+        log.info("Creating table {}.{} using template-based approach", dbName, 
tableName);
+        Table table = buildTableFromTemplate();
+        hiveCatalog.createTableFromTemplate(table);
+        log.info("Successfully created table {}.{}", dbName, tableName);
+    }
+
+    private List<String> extractPartitionFieldsFromConfig() {
+        if 
(readonlyConfig.getOptional(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE).isPresent())
 {
+            String template = 
readonlyConfig.get(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE);
+            return 
HiveTableTemplateUtils.extractPartitionFieldsFromTemplate(template);
+        }
+        return new ArrayList<>();
+    }
+
+    private Table buildTableFromTemplate() {
+        log.info("Building table {}.{} from template", dbName, tableName);
+
+        if 
(readonlyConfig.getOptional(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE).isPresent())
 {
+            return buildTableFromCustomTemplate();
+        } else {
+            return buildTableFromDefaultTemplate();
+        }
+    }
+
+    private Table buildTableFromDefaultTemplate() {
+        log.info(
+                "Building table {}.{} using default template (PARQUET, no 
partitions)",
+                dbName,
+                tableName);
+
+        Table table = new Table();
+        table.setDbName(dbName);
+        table.setTableName(tableName);
+        table.setOwner(System.getProperty("user.name", "seatunnel"));
+        table.setCreateTime((int) (System.currentTimeMillis() / 1000));
+        table.setTableType("MANAGED_TABLE");
+
+        table.setPartitionKeys(new ArrayList<>());
+
+        // Set storage descriptor
+        StorageDescriptor sd = new StorageDescriptor();
+
+        // Initialize SerDe
+        org.apache.hadoop.hive.metastore.api.SerDeInfo serdeInfo =
+                new org.apache.hadoop.hive.metastore.api.SerDeInfo();
+        serdeInfo.setName(table.getTableName());
+        sd.setSerdeInfo(serdeInfo);
+
+        // Set all columns as regular columns (no partitions in default 
template)
+        List<FieldSchema> cols = new ArrayList<>();
+        tableSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            String hiveType =
+                                    
HiveTypeConvertor.seatunnelToHiveType(column.getDataType());
+                            String comment = column.getComment();
+                            cols.add(new FieldSchema(column.getName(), 
hiveType, comment));
+                        });
+        sd.setCols(cols);
+
+        // Set table location
+        String tableLocation = 
HiveTableTemplateUtils.getDefaultTableLocation(dbName, tableName);
+        sd.setLocation(tableLocation);
+
+        configureStorageDescriptor(sd, "PARQUET");
+        sd.setCompressed(false);
+        sd.setStoredAsSubDirectories(false);
+
+        table.setSd(sd);
+
+        // Set table parameters
+        table.putToParameters("seatunnel.creation.mode", "default_template");
+        table.putToParameters("seatunnel.created.time", 
String.valueOf(System.currentTimeMillis()));
+
+        return table;
+    }
+
+    private Table buildTableFromCustomTemplate() {
+        log.info("Building table {}.{} using custom template", dbName, 
tableName);
+
+        Table table = new Table();
+        table.setDbName(dbName);
+        table.setTableName(tableName);
+        table.setOwner(System.getProperty("user.name", "seatunnel"));
+        table.setCreateTime((int) (System.currentTimeMillis() / 1000));
+        table.setTableType("MANAGED_TABLE");
+
+        List<String> partitionFields = extractPartitionFieldsFromConfig();
+        List<FieldSchema> partitionKeys = new ArrayList<>();
+        for (String partitionField : partitionFields) {
+            tableSchema.getColumns().stream()
+                    .filter(column -> column.getName().equals(partitionField))
+                    .findFirst()
+                    .ifPresent(
+                            column -> {
+                                String hiveType =
+                                        
HiveTypeConvertor.seatunnelToHiveType(column.getDataType());
+                                String comment = column.getComment();
+                                partitionKeys.add(
+                                        new FieldSchema(partitionField, 
hiveType, comment));
+                            });
+        }
+        table.setPartitionKeys(partitionKeys);
+
+        // Set storage descriptor
+        StorageDescriptor sd = new StorageDescriptor();
+
+        // Initialize SerDe
+        org.apache.hadoop.hive.metastore.api.SerDeInfo serdeInfo =
+                new org.apache.hadoop.hive.metastore.api.SerDeInfo();
+        serdeInfo.setName(table.getTableName());
+        sd.setSerdeInfo(serdeInfo);
+
+        // Set columns (exclude partition fields from regular columns)
+        List<FieldSchema> cols = new ArrayList<>();
+        tableSchema.getColumns().stream()
+                .filter(column -> !partitionFields.contains(column.getName()))
+                .forEach(
+                        column -> {
+                            String hiveType =
+                                    
HiveTypeConvertor.seatunnelToHiveType(column.getDataType());
+                            String comment = column.getComment();
+                            cols.add(new FieldSchema(column.getName(), 
hiveType, comment));
+                        });
+        sd.setCols(cols);
+
+        // Set table location
+        String tableLocation = 
HiveTableTemplateUtils.getDefaultTableLocation(dbName, tableName);

Review Comment:
   Modified in the latest commit。



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java:
##########
@@ -56,4 +57,53 @@ public static SeaTunnelDataType<?> 
covertHiveTypeToSeaTunnelType(String name, St
         }
         return 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType);
     }
+
+    public static String seatunnelToHiveType(SeaTunnelDataType<?> 
seaTunnelType) {
+        switch (seaTunnelType.getSqlType()) {
+            case STRING:
+                return "string";
+            case BOOLEAN:
+                return "boolean";
+            case TINYINT:
+                return "tinyint";
+            case SMALLINT:
+                return "smallint";
+            case INT:
+                return "int";
+            case BIGINT:
+                return "bigint";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case DECIMAL:
+                if (seaTunnelType instanceof DecimalType) {
+                    DecimalType decimalType = (DecimalType) seaTunnelType;
+                    return String.format(
+                            "decimal(%d,%d)", decimalType.getPrecision(), 
decimalType.getScale());
+                }
+                return "decimal(38,18)";
+            case BYTES:
+                return "binary";
+            case DATE:
+                return "date";
+            case TIME:
+                return "string";
+            case TIMESTAMP:
+                return "timestamp";
+            case ROW:
+                return "struct";
+            case ARRAY:
+                return "array";

Review Comment:
   > ditto
   
   I will enhance the supplementation for code with complex structures.



##########
docs/en/connector-v2/sink/Hive.md:
##########
@@ -100,6 +100,22 @@ Support writing Parquet INT96 from a timestamp, only valid 
for parquet files.
 
 Flag to decide whether to use overwrite mode when inserting data into Hive. If 
set to true, for non-partitioned tables, the existing data in the table will be 
deleted before inserting new data. For partitioned tables, the data in the 
relevant partition will be deleted before inserting new data.
 
+### schema_save_mode [enum]
+
+Before starting the synchronization task, different processing schemes are 
selected for the existing table structure on the target side.
+
+Option values:
+- `RECREATE_SCHEMA`: Will create when the table does not exist, delete and 
rebuild when the table exists
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`: Will create when the table does not exist, 
skip when the table exists
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`: Error will be reported when the table does 
not exist
+- `IGNORE`: Ignore the treatment of the table
+
+
+
+### save_mode_create_template [string]

Review Comment:
   Has been added



##########
seatunnel-connectors-v2/connector-hive/pom.xml:
##########
@@ -116,12 +115,81 @@
                     <groupId>org.pentaho</groupId>
                     <artifactId>pentaho-aggdesigner-algorithm</artifactId>
                 </exclusion>
+                <!-- Exclude to include separately with proper exclusions -->
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Hive Common dependency - contains HiveConf class -->
+        <dependency>

Review Comment:
   The relevant dependencies have been removed at present.



##########
seatunnel-connectors-v2/connector-hive/pom.xml:
##########
@@ -82,32 +83,30 @@
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
             <version>${hive.exec.version}</version>
-            <scope>provided</scope>

Review Comment:
   The relevant dependencies have been removed at present.



##########
docs/en/connector-v2/sink/Hive.md:
##########
@@ -100,6 +100,26 @@ Support writing Parquet INT96 from a timestamp, only valid 
for parquet files.
 
 Flag to decide whether to use overwrite mode when inserting data into Hive. If 
set to true, for non-partitioned tables, the existing data in the table will be 
deleted before inserting new data. For partitioned tables, the data in the 
relevant partition will be deleted before inserting new data.
 
+### schema_save_mode [enum]
+
+Before starting the synchronization task, different processing schemes are 
selected for the existing table structure on the target side.
+
+Option values:
+- `RECREATE_SCHEMA`: Will create when the table does not exist, delete and 
rebuild when the table exists
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`: Will create when the table does not exist, 
skip when the table exists
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`: Error will be reported when the table does 
not exist
+- `IGNORE`: Ignore the treatment of the table
+
+
+
+### table_format [string]

Review Comment:
   The latest modification currently is the removal of the 
save_mode_create_template parameter. Instead, table creation is directly 
implemented by calling Hive API interfaces using the parameters set via 
schema_save_mode, and this approach is more reliable.



##########
seatunnel-connectors-v2/connector-hive/pom.xml:
##########
@@ -116,12 +115,81 @@
                     <groupId>org.pentaho</groupId>
                     <artifactId>pentaho-aggdesigner-algorithm</artifactId>
                 </exclusion>
+                <!-- Exclude to include separately with proper exclusions -->
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Hive Common dependency - contains HiveConf class -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <version>${hive.exec.version}</version>
+            <exclusions>
+                <!-- Exclude logging dependencies to avoid conflicts -->
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Hive MetaStore dependency - contains HiveMetaStoreClient, 
AlreadyExistsException and other metastore API classes -->
+        <dependency>

Review Comment:
   The relevant dependencies have been removed at present.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields from template if available
+        this.partitionFields = extractPartitionFieldsFromConfig();
+    }
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode,
+            Catalog catalog) {
+        this(readonlyConfig, catalogTable, schemaSaveMode);
+        this.optionalCatalog = catalog;
+    }
+
+    @Override
+    public void open() {
+        this.hiveCatalog = HiveMetaStoreCatalog.create(readonlyConfig);
+        if (this.optionalCatalog == null) {
+            this.optionalCatalog = this.hiveCatalog;
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        // For Hive, we use the same logic as handleSchemaSaveMode
+        handleSchemaSaveMode();
+    }
+
+    @Override
+    public TablePath getHandleTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        return optionalCatalog;
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return schemaSaveMode;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        // Hive uses OVERWRITE parameter for data handling
+        return DataSaveMode.APPEND_DATA;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (optionalCatalog != null) {
+            optionalCatalog.close();
+        }
+        if (hiveCatalog != null && hiveCatalog != optionalCatalog) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        try {
+            switch (schemaSaveMode) {
+                case RECREATE_SCHEMA:
+                    handleRecreateSchema();
+                    break;
+                case CREATE_SCHEMA_WHEN_NOT_EXIST:
+                    handleCreateSchemaWhenNotExist();
+                    break;
+                case ERROR_WHEN_SCHEMA_NOT_EXIST:
+                    handleErrorWhenSchemaNotExist();
+                    break;
+                case IGNORE:
+                    log.info(
+                            "Ignore schema save mode, skip schema handling for 
table {}.{}",
+                            dbName,
+                            tableName);
+                    break;
+                default:
+                    throw new HiveConnectorException(
+                            HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                            "Unsupported schema save mode: " + schemaSaveMode);
+            }
+        } catch (Exception e) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Failed to handle schema save mode: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void handleDataSaveMode() {
+        // For Hive, data save mode is handled by the existing OVERWRITE 
parameter
+        // No additional data handling is needed here
+        log.info(
+                "Data save mode handling is managed by existing OVERWRITE 
parameter for table {}.{}",
+                dbName,
+                tableName);
+    }
+
+    private void handleRecreateSchema() throws TException {
+        log.info("Recreate schema mode: dropping and recreating table {}.{}", 
dbName, tableName);
+
+        // Do NOT create database automatically. Ensure database exists first.
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist. Please create it 
manually.");
+        }
+
+        // Drop table if exists
+        if (hiveCatalog.tableExists(dbName, tableName)) {
+            hiveCatalog.dropTable(dbName, tableName);
+            log.info("Dropped existing table {}.{}", dbName, tableName);
+        }
+
+        // Create table using template
+        createTable();
+    }
+
+    private void handleCreateSchemaWhenNotExist() throws TException {
+        log.info("Create schema when not exist mode for table {}.{}", dbName, 
tableName);
+
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist. Please create it 
manually.");
+        }
+
+        if (!hiveCatalog.tableExists(dbName, tableName)) {
+            createTable();
+        } else {
+            log.info("Table {}.{} already exists, skipping creation", dbName, 
tableName);
+        }
+    }
+
+    private void handleErrorWhenSchemaNotExist() throws TException {
+        log.info("Error when schema not exist mode: checking table {}.{}", 
dbName, tableName);
+
+        // Check if database exists
+        if (!hiveCatalog.databaseExists(dbName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Database " + dbName + " does not exist");
+        }
+
+        // Check if table exists
+        if (!hiveCatalog.tableExists(dbName, tableName)) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Table " + dbName + "." + tableName + " does not exist");
+        }
+    }
+
+    private void createTable() throws TException {
+        log.info("Creating table {}.{} using template-based approach", dbName, 
tableName);
+        Table table = buildTableFromTemplate();
+        hiveCatalog.createTableFromTemplate(table);
+        log.info("Successfully created table {}.{}", dbName, tableName);
+    }
+
+    private List<String> extractPartitionFieldsFromConfig() {
+        if 
(readonlyConfig.getOptional(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE).isPresent())
 {
+            String template = 
readonlyConfig.get(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE);
+            return 
HiveTableTemplateUtils.extractPartitionFieldsFromTemplate(template);
+        }
+        return new ArrayList<>();
+    }
+
+    private Table buildTableFromTemplate() {
+        log.info("Building table {}.{} from template", dbName, tableName);
+
+        if 
(readonlyConfig.getOptional(HiveSinkOptions.SAVE_MODE_CREATE_TEMPLATE).isPresent())
 {
+            return buildTableFromCustomTemplate();
+        } else {
+            return buildTableFromDefaultTemplate();
+        }
+    }
+
+    private Table buildTableFromDefaultTemplate() {
+        log.info(
+                "Building table {}.{} using default template (PARQUET, no 
partitions)",
+                dbName,
+                tableName);
+
+        Table table = new Table();
+        table.setDbName(dbName);
+        table.setTableName(tableName);
+        table.setOwner(System.getProperty("user.name", "seatunnel"));
+        table.setCreateTime((int) (System.currentTimeMillis() / 1000));
+        table.setTableType("MANAGED_TABLE");
+
+        table.setPartitionKeys(new ArrayList<>());
+
+        // Set storage descriptor
+        StorageDescriptor sd = new StorageDescriptor();
+
+        // Initialize SerDe
+        org.apache.hadoop.hive.metastore.api.SerDeInfo serdeInfo =
+                new org.apache.hadoop.hive.metastore.api.SerDeInfo();
+        serdeInfo.setName(table.getTableName());
+        sd.setSerdeInfo(serdeInfo);
+
+        // Set all columns as regular columns (no partitions in default 
template)
+        List<FieldSchema> cols = new ArrayList<>();
+        tableSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            String hiveType =
+                                    
HiveTypeConvertor.seatunnelToHiveType(column.getDataType());
+                            String comment = column.getComment();
+                            cols.add(new FieldSchema(column.getName(), 
hiveType, comment));
+                        });
+        sd.setCols(cols);
+
+        // Set table location
+        String tableLocation = 
HiveTableTemplateUtils.getDefaultTableLocation(dbName, tableName);
+        sd.setLocation(tableLocation);
+
+        configureStorageDescriptor(sd, "PARQUET");
+        sd.setCompressed(false);
+        sd.setStoredAsSubDirectories(false);
+
+        table.setSd(sd);
+
+        // Set table parameters
+        table.putToParameters("seatunnel.creation.mode", "default_template");
+        table.putToParameters("seatunnel.created.time", 
String.valueOf(System.currentTimeMillis()));
+
+        return table;
+    }
+
+    private Table buildTableFromCustomTemplate() {
+        log.info("Building table {}.{} using custom template", dbName, 
tableName);
+
+        Table table = new Table();
+        table.setDbName(dbName);
+        table.setTableName(tableName);
+        table.setOwner(System.getProperty("user.name", "seatunnel"));
+        table.setCreateTime((int) (System.currentTimeMillis() / 1000));
+        table.setTableType("MANAGED_TABLE");

Review Comment:
   Modified in the latest commit。



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java:
##########
@@ -47,11 +55,12 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 @Slf4j
-public class HiveMetaStoreProxy implements Closeable, Serializable {
+public class HiveMetaStoreProxy implements Catalog, Closeable, Serializable {

Review Comment:
   @Hisoka-X  Could you please take some time to check the unit11 module in the 
CI process? Currently, similar issues seem to occur consistently.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java:
##########
@@ -47,11 +55,12 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
 @Slf4j
-public class HiveMetaStoreProxy implements Closeable, Serializable {
+public class HiveMetaStoreProxy implements Catalog, Closeable, Serializable {

Review Comment:
   Has been modified



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields from template if available
+        this.partitionFields = extractPartitionFieldsFromConfig();
+    }
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode,
+            Catalog catalog) {
+        this(readonlyConfig, catalogTable, schemaSaveMode);
+        this.optionalCatalog = catalog;
+    }
+
+    @Override
+    public void open() {
+        this.hiveCatalog = HiveMetaStoreCatalog.create(readonlyConfig);
+        if (this.optionalCatalog == null) {
+            this.optionalCatalog = this.hiveCatalog;
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        // For Hive, we use the same logic as handleSchemaSaveMode
+        handleSchemaSaveMode();
+    }
+
+    @Override
+    public TablePath getHandleTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        return optionalCatalog;
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return schemaSaveMode;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        // Hive uses OVERWRITE parameter for data handling
+        return DataSaveMode.APPEND_DATA;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (optionalCatalog != null) {
+            optionalCatalog.close();
+        }
+        if (hiveCatalog != null && hiveCatalog != optionalCatalog) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        try {
+            switch (schemaSaveMode) {
+                case RECREATE_SCHEMA:
+                    handleRecreateSchema();
+                    break;
+                case CREATE_SCHEMA_WHEN_NOT_EXIST:
+                    handleCreateSchemaWhenNotExist();
+                    break;
+                case ERROR_WHEN_SCHEMA_NOT_EXIST:
+                    handleErrorWhenSchemaNotExist();
+                    break;
+                case IGNORE:
+                    log.info(
+                            "Ignore schema save mode, skip schema handling for 
table {}.{}",
+                            dbName,
+                            tableName);
+                    break;
+                default:
+                    throw new HiveConnectorException(
+                            HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                            "Unsupported schema save mode: " + schemaSaveMode);
+            }
+        } catch (Exception e) {
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.CREATE_HIVE_TABLE_FAILED,
+                    "Failed to handle schema save mode: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void handleDataSaveMode() {
+        // For Hive, data save mode is handled by the existing OVERWRITE 
parameter
+        // No additional data handling is needed here
+        log.info(
+                "Data save mode handling is managed by existing OVERWRITE 
parameter for table {}.{}",
+                dbName,
+                tableName);

Review Comment:
   > Let's merge overwrite logic with datasavemode. We can set `overwrite=true` 
or `datasavemode=TRUNCATE` to do same thing.
   
   The corresponding logic has been modified.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java:
##########
@@ -259,4 +256,36 @@ public void testFakeSinkHiveOnOSS(TestContainer container) 
throws Exception {
     public void testFakeSinkHiveOnCos(TestContainer container) throws 
Exception {
         executeJob(container, "/fake_to_hive_on_cos.conf", 
"/hive_on_cos_to_assert.conf");
     }
+
+    @TestTemplate
+    public void testAutoTableCreationCreateWhenNotExist(TestContainer 
container) throws Exception {
+        executeJob(
+                container,
+                "/auto_table_creation/fake_to_hive_create_when_not_exist.conf",
+                "/auto_table_creation/hive_auto_create_to_assert.conf");

Review Comment:
   > why use new seatunnel job to verify table? Maybe we can just verify new 
table by hiveclient.
   
   This is to ensure the format consistency of Hiveit, and this method is 
relatively concise.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields from template if available
+        this.partitionFields = extractPartitionFieldsFromConfig();
+    }
+
+    public HiveSaveModeHandler(

Review Comment:
   > useless?
   
   It has been removed;



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;

Review Comment:
   > why not direct use `hiveCatalog` but `optionalCatalog`?
   
   It has been removed; HiveSaveModeHandler#getHandleCatalog directly returns 
hiveCatalog.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final SchemaSaveMode schemaSaveMode;
+    private final TablePath tablePath;
+    private final String dbName;
+    private final String tableName;
+    private final TableSchema tableSchema;
+    private final List<String> partitionFields;
+
+    private HiveMetaStoreCatalog hiveCatalog;
+    private Catalog optionalCatalog;
+
+    public HiveSaveModeHandler(
+            ReadonlyConfig readonlyConfig,
+            CatalogTable catalogTable,
+            SchemaSaveMode schemaSaveMode) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.schemaSaveMode = schemaSaveMode;
+        this.tablePath = 
TablePath.of(readonlyConfig.get(HiveOptions.TABLE_NAME));
+        this.dbName = tablePath.getDatabaseName();
+        this.tableName = tablePath.getTableName();
+        this.tableSchema = catalogTable.getTableSchema();
+
+        // Initialize partition fields from template if available
+        this.partitionFields = extractPartitionFieldsFromConfig();

Review Comment:
   > I found this field only used by test case? We should use another way to 
verify partition fields in test case, not in runtime code.
   
   Class fields and the isPartitionedTable() method have been removed.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java:
##########
@@ -56,4 +57,53 @@ public static SeaTunnelDataType<?> 
covertHiveTypeToSeaTunnelType(String name, St
         }
         return 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType);
     }
+
+    public static String seatunnelToHiveType(SeaTunnelDataType<?> 
seaTunnelType) {
+        switch (seaTunnelType.getSqlType()) {
+            case STRING:
+                return "string";
+            case BOOLEAN:
+                return "boolean";
+            case TINYINT:
+                return "tinyint";
+            case SMALLINT:
+                return "smallint";
+            case INT:
+                return "int";
+            case BIGINT:
+                return "bigint";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case DECIMAL:
+                if (seaTunnelType instanceof DecimalType) {
+                    DecimalType decimalType = (DecimalType) seaTunnelType;
+                    return String.format(
+                            "decimal(%d,%d)", decimalType.getPrecision(), 
decimalType.getScale());
+                }
+                return "decimal(38,18)";
+            case BYTES:
+                return "binary";
+            case DATE:
+                return "date";
+            case TIME:
+                return "string";
+            case TIMESTAMP:
+                return "timestamp";
+            case ROW:
+                return "struct";

Review Comment:
   > Is there an error here when struct needs to map fields?
   
   I will enhance the supplementation for code with complex structures.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSaveModeHandler.java:
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableTemplateUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class HiveSaveModeHandler implements SaveModeHandler, AutoCloseable {

Review Comment:
   > Why not implement `DefaultSaveModeHandler`? Many code is duplicated.
   
   Since Hive's "template-based table creation" involves SQL-level template 
parsing followed by Metastore API calls, which falls outside the general scope 
of CatalogTable, we will continue to maintain Hive's dedicated SaveModeHandler 
(instead of reusing DefaultSaveModeHandler), while making every effort to 
minimize code duplication.



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java:
##########
@@ -36,4 +37,26 @@ public class HiveSinkOptions extends HiveOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Flag to decide whether to use overwrite mode when 
inserting data into Hive. If set to true, for non-partitioned tables, the 
existing data in the table will be deleted before inserting new data. For 
partitioned tables, the data in the relevant partition will be deleted before 
inserting new data.");
+
+    // SaveMode related options
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription(
+                            "Schema save mode for auto table creation. "
+                                    + "CREATE_SCHEMA_WHEN_NOT_EXIST: Create 
table when not exists (default). "
+                                    + "RECREATE_SCHEMA: Drop and recreate 
table. "
+                                    + "ERROR_WHEN_SCHEMA_NOT_EXIST: Throw 
error when table not exists. "
+                                    + "IGNORE: Skip table creation.");
+
+    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
+            Options.key("save_mode_create_template")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   > default value?
   
   You can refer to the buildTableFromDefaultTemplate method, we use templates 
to automatically create Hive tables, which will create corresponding table 
creation statements based on the type of upstream data and schema type, and the 
default template can be modified according to the situation. Available template 
variables: ${database}, ${table}, ${rowtype_fields}, 
${rowtype_partition_fields}, ${table_location}.
   
   **Default value**: When not specified, uses a default PARQUET 
non-partitioned table template:
   ```sql
   CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
     ${rowtype_fields}
   )
   STORED AS PARQUET
   LOCATION '${table_location}'
   ```



##########
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java:
##########
@@ -56,4 +57,53 @@ public static SeaTunnelDataType<?> 
covertHiveTypeToSeaTunnelType(String name, St
         }
         return 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType);
     }
+
+    public static String seatunnelToHiveType(SeaTunnelDataType<?> 
seaTunnelType) {
+        switch (seaTunnelType.getSqlType()) {
+            case STRING:
+                return "string";
+            case BOOLEAN:
+                return "boolean";
+            case TINYINT:
+                return "tinyint";
+            case SMALLINT:
+                return "smallint";
+            case INT:
+                return "int";
+            case BIGINT:
+                return "bigint";
+            case FLOAT:
+                return "float";
+            case DOUBLE:
+                return "double";
+            case DECIMAL:
+                if (seaTunnelType instanceof DecimalType) {
+                    DecimalType decimalType = (DecimalType) seaTunnelType;
+                    return String.format(
+                            "decimal(%d,%d)", decimalType.getPrecision(), 
decimalType.getScale());
+                }
+                return "decimal(38,18)";
+            case BYTES:
+                return "binary";
+            case DATE:
+                return "date";
+            case TIME:
+                return "string";
+            case TIMESTAMP:
+                return "timestamp";
+            case ROW:
+                return "struct";
+            case ARRAY:
+                return "array";
+            case MAP:
+                return "map";

Review Comment:
   > ditto
   
   I will enhance the supplementation for code with complex structures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to