This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0b8f9de465 [Improve][Connector-V2] MaxComputeSink support create 
partition in savemode (#8474)
0b8f9de465 is described below

commit 0b8f9de4656b4140ec9bc0995ef1e26468af4818
Author: Jia Fan <[email protected]>
AuthorDate: Sun Jan 12 20:24:45 2025 +0800

    [Improve][Connector-V2] MaxComputeSink support create partition in savemode 
(#8474)
---
 docs/en/connector-v2/sink/Maxcompute.md            |  4 +-
 .../maxcompute/catalog/MaxComputeCatalog.java      | 31 +++++++++-
 .../maxcompute/sink/MaxComputeSaveModeHandler.java | 68 ++++++++++++++++++++++
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java  |  6 +-
 4 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/docs/en/connector-v2/sink/Maxcompute.md 
b/docs/en/connector-v2/sink/Maxcompute.md
index 2f42834189..7210ee5bd3 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -91,8 +91,8 @@ You can use the following placeholders
 
 Before the synchronous task is turned on, different treatment schemes are 
selected for the existing surface structure of the target side.  
 Option introduction:  
-`RECREATE_SCHEMA` :Will create when the table does not exist, delete and 
rebuild when the table is saved        
-`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, 
skipped when the table is saved        
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and 
rebuild when the table is saved. If the `partition_spec` is set, the partition 
will be deleted and rebuilt.        
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, 
skipped when the table is saved. If the `partition_spec` is set, the partition 
will be created.        
 `ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not 
exist  
 `IGNORE` :Ignore the treatment of the table
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index f90a5c4086..a8229a078f 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -271,6 +271,27 @@ public class MaxComputeCatalog implements Catalog {
         }
     }
 
+    public void createPartition(TablePath tablePath, PartitionSpec 
partitionSpec) {
+        try {
+            Odps odps = getOdps(tablePath.getDatabaseName());
+            Table odpsTable = odps.tables().get(tablePath.getTableName());
+            odpsTable.createPartition(partitionSpec, true);
+        } catch (Exception e) {
+            throw new CatalogException("create partition error", e);
+        }
+    }
+
+    public void truncatePartition(TablePath tablePath, PartitionSpec 
partitionSpec) {
+        try {
+            Odps odps = getOdps(tablePath.getDatabaseName());
+            Table odpsTable = odps.tables().get(tablePath.getTableName());
+            odpsTable.deletePartition(partitionSpec, true);
+            odpsTable.createPartition(partitionSpec, true);
+        } catch (Exception e) {
+            throw new CatalogException("create partition error", e);
+        }
+    }
+
     @Override
     public boolean isExistsData(TablePath tablePath) {
         throw new UnsupportedOperationException();
@@ -280,7 +301,15 @@ public class MaxComputeCatalog implements Catalog {
     public void executeSql(TablePath tablePath, String sql) {
         try {
             Odps odps = getOdps(tablePath.getDatabaseName());
-            SQLTask.run(odps, sql).waitForSuccess();
+            String[] sqls = sql.split(";");
+            for (String s : sqls) {
+                if (!s.trim().isEmpty()) {
+                    if (!s.trim().endsWith(";")) {
+                        s = s.trim() + ";";
+                    }
+                    SQLTask.run(odps, s).waitForSuccess();
+                }
+            }
         } catch (OdpsException e) {
             throw new CatalogException("execute sql error", e);
         }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
new file mode 100644
index 0000000000..e8fa3832b9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.aliyun.odps.PartitionSpec;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+
+public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {
+
+    private final ReadonlyConfig readonlyConfig;
+
+    public MaxComputeSaveModeHandler(
+            SchemaSaveMode schemaSaveMode,
+            DataSaveMode dataSaveMode,
+            Catalog catalog,
+            CatalogTable catalogTable,
+            String customSql,
+            ReadonlyConfig readonlyConfig) {
+        super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
+        this.readonlyConfig = readonlyConfig;
+    }
+
+    @Override
+    protected void createSchemaWhenNotExist() {
+        super.createSchemaWhenNotExist();
+        if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+            ((MaxComputeCatalog) catalog)
+                    .createPartition(
+                            tablePath, new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+        }
+    }
+
+    @Override
+    protected void recreateSchema() {
+        super.recreateSchema();
+        if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+            ((MaxComputeCatalog) catalog)
+                    .createPartition(
+                            tablePath, new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index ddfe1de08b..de3d90f6a7 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -20,7 +20,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
@@ -94,12 +93,13 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         }
 
         return Optional.of(
-                new DefaultSaveModeHandler(
+                new MaxComputeSaveModeHandler(
                         readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
                         dataSaveMode,
                         catalog,
                         catalogTable,
-                        readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
+                        readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
+                        readonlyConfig));
     }
 
     @Override

Reply via email to