This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b8f9de465 [Improve][Connector-V2] MaxComputeSink support create
partition in savemode (#8474)
0b8f9de465 is described below
commit 0b8f9de4656b4140ec9bc0995ef1e26468af4818
Author: Jia Fan <[email protected]>
AuthorDate: Sun Jan 12 20:24:45 2025 +0800
[Improve][Connector-V2] MaxComputeSink support create partition in savemode
(#8474)
---
docs/en/connector-v2/sink/Maxcompute.md | 4 +-
.../maxcompute/catalog/MaxComputeCatalog.java | 31 +++++++++-
.../maxcompute/sink/MaxComputeSaveModeHandler.java | 68 ++++++++++++++++++++++
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 6 +-
4 files changed, 103 insertions(+), 6 deletions(-)
diff --git a/docs/en/connector-v2/sink/Maxcompute.md
b/docs/en/connector-v2/sink/Maxcompute.md
index 2f42834189..7210ee5bd3 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -91,8 +91,8 @@ You can use the following placeholders
Before the synchronous task is turned on, different treatment schemes are
selected for the existing surface structure of the target side.
Option introduction:
-`RECREATE_SCHEMA` :Will create when the table does not exist, delete and
rebuild when the table is saved
-`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist,
skipped when the table is saved
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and
rebuild when the table is saved. If the `partition_spec` is set, the partition
will be deleted and rebuilt.
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist,
skipped when the table is saved. If the `partition_spec` is set, the partition
will be created.
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not
exist
`IGNORE` :Ignore the treatment of the table
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index f90a5c4086..a8229a078f 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -271,6 +271,27 @@ public class MaxComputeCatalog implements Catalog {
}
}
+ public void createPartition(TablePath tablePath, PartitionSpec
partitionSpec) {
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ Table odpsTable = odps.tables().get(tablePath.getTableName());
+ odpsTable.createPartition(partitionSpec, true);
+ } catch (Exception e) {
+ throw new CatalogException("create partition error", e);
+ }
+ }
+
+ public void truncatePartition(TablePath tablePath, PartitionSpec
partitionSpec) {
+ try {
+ Odps odps = getOdps(tablePath.getDatabaseName());
+ Table odpsTable = odps.tables().get(tablePath.getTableName());
+ odpsTable.deletePartition(partitionSpec, true);
+ odpsTable.createPartition(partitionSpec, true);
+ } catch (Exception e) {
+ throw new CatalogException("create partition error", e);
+ }
+ }
+
@Override
public boolean isExistsData(TablePath tablePath) {
throw new UnsupportedOperationException();
@@ -280,7 +301,15 @@ public class MaxComputeCatalog implements Catalog {
public void executeSql(TablePath tablePath, String sql) {
try {
Odps odps = getOdps(tablePath.getDatabaseName());
- SQLTask.run(odps, sql).waitForSuccess();
+ String[] sqls = sql.split(";");
+ for (String s : sqls) {
+ if (!s.trim().isEmpty()) {
+ if (!s.trim().endsWith(";")) {
+ s = s.trim() + ";";
+ }
+ SQLTask.run(odps, s).waitForSuccess();
+ }
+ }
} catch (OdpsException e) {
throw new CatalogException("execute sql error", e);
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
new file mode 100644
index 0000000000..e8fa3832b9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.aliyun.odps.PartitionSpec;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+
+public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {
+
+ private final ReadonlyConfig readonlyConfig;
+
+ public MaxComputeSaveModeHandler(
+ SchemaSaveMode schemaSaveMode,
+ DataSaveMode dataSaveMode,
+ Catalog catalog,
+ CatalogTable catalogTable,
+ String customSql,
+ ReadonlyConfig readonlyConfig) {
+ super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
+ this.readonlyConfig = readonlyConfig;
+ }
+
+ @Override
+ protected void createSchemaWhenNotExist() {
+ super.createSchemaWhenNotExist();
+ if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+ ((MaxComputeCatalog) catalog)
+ .createPartition(
+ tablePath, new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+ }
+ }
+
+ @Override
+ protected void recreateSchema() {
+ super.recreateSchema();
+ if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+ ((MaxComputeCatalog) catalog)
+ .createPartition(
+ tablePath, new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index ddfe1de08b..de3d90f6a7 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -20,7 +20,6 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
@@ -94,12 +93,13 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
return Optional.of(
- new DefaultSaveModeHandler(
+ new MaxComputeSaveModeHandler(
readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
dataSaveMode,
catalog,
catalogTable,
- readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL)));
+ readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
+ readonlyConfig));
}
@Override