This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c24e3b12ba [Improve][Connector-V2] Add pre-check starrocks version 
before exeucte alter table field name (#8237)
c24e3b12ba is described below

commit c24e3b12bafa2b5941dc81d0ce4b9e38a6d3379b
Author: 峰峰 <[email protected]>
AuthorDate: Mon Dec 16 00:26:11 2024 +0800

    [Improve][Connector-V2] Add pre-check starrocks version before exeucte 
alter table field name (#8237)
---
 .../connector-starrocks/pom.xml                    |  6 ++
 .../seatunnel/starrocks/util/SchemaUtils.java      | 72 ++++++++++++++--------
 2 files changed, 54 insertions(+), 24 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml 
b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 6653610de4..560c26ca13 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -36,6 +36,7 @@
         <mysql.version>8.0.16</mysql.version>
         <starrocks.thrift.sdk.version>1.0.1</starrocks.thrift.sdk.version>
         <arrow.version>5.0.0</arrow.version>
+        <mavenartifact.version>3.6.3</mavenartifact.version>
     </properties>
 
     <dependencies>
@@ -70,6 +71,11 @@
             <artifactId>starrocks-thrift-sdk</artifactId>
             <version>${starrocks.thrift.sdk.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>${mavenartifact.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java
index 4e3a589085..18df8f4c77 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/SchemaUtils.java
@@ -32,15 +32,20 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksTypeConverter;
 
+import org.apache.maven.artifact.versioning.ComparableVersion;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
 @Slf4j
 public class SchemaUtils {
 
+    private static final String MIN_VERSION_TABLE_CHANGE_COLUMN = "3.3.2";
+
     private SchemaUtils() {}
 
     /**
@@ -111,33 +116,52 @@ public class SchemaUtils {
     public static void applySchemaChange(
             Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
             throws SQLException {
-        StringBuilder sqlBuilder =
-                new StringBuilder()
-                        .append("ALTER TABLE")
-                        .append(" ")
-                        .append(tablePath.getFullName())
+        ComparableVersion targetVersion = new 
ComparableVersion(MIN_VERSION_TABLE_CHANGE_COLUMN);
+        ComparableVersion currentVersion;
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet =
+                        statement.executeQuery("SELECT CURRENT_VERSION() as 
version")) {
+            resultSet.next();
+            String version = resultSet.getString(1);
+            log.debug("starrocks version: {}", version);
+            String versionOne = version.split(" ")[0];
+            currentVersion = new ComparableVersion(versionOne);
+        }
+
+        if (currentVersion.compareTo(targetVersion) >= 0) {
+            StringBuilder sqlBuilder =
+                    new StringBuilder()
+                            .append("ALTER TABLE")
+                            .append(" ")
+                            .append(tablePath.getFullName())
+                            .append(" ")
+                            .append("RENAME COLUMN")
+                            .append(" ")
+                            .append(quoteIdentifier(event.getOldColumn()))
+                            .append(" TO ")
+                            
.append(quoteIdentifier(event.getColumn().getName()));
+            if (event.getColumn().getComment() != null) {
+                sqlBuilder
                         .append(" ")
-                        .append("RENAME COLUMN")
+                        .append("COMMENT ")
+                        .append("'")
+                        .append(event.getColumn().getComment())
+                        .append("'");
+            }
+            if (event.getAfterColumn() != null) {
+                sqlBuilder
                         .append(" ")
-                        .append(quoteIdentifier(event.getOldColumn()))
-                        .append(" TO ")
-                        .append(quoteIdentifier(event.getColumn().getName()));
-        if (event.getColumn().getComment() != null) {
-            sqlBuilder
-                    .append(" ")
-                    .append("COMMENT ")
-                    .append("'")
-                    .append(event.getColumn().getComment())
-                    .append("'");
-        }
-        if (event.getAfterColumn() != null) {
-            sqlBuilder.append(" ").append("AFTER 
").append(quoteIdentifier(event.getAfterColumn()));
-        }
+                        .append("AFTER ")
+                        .append(quoteIdentifier(event.getAfterColumn()));
+            }
 
-        String changeColumnSQL = sqlBuilder.toString();
-        try (Statement statement = connection.createStatement()) {
-            log.info("Executing change column SQL: " + changeColumnSQL);
-            statement.execute(changeColumnSQL);
+            String changeColumnSQL = sqlBuilder.toString();
+            try (Statement statement = connection.createStatement()) {
+                log.info("Executing change column SQL: " + changeColumnSQL);
+                statement.execute(changeColumnSQL);
+            }
+        } else {
+            log.warn("versions prior to starrocks 3.3.2 do not support rename 
column operations");
         }
     }
 

Reply via email to