This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 258f931765 [Fix][Connector-V2]Oceanbase vector database is added as 
the source server (#7832)
258f931765 is described below

commit 258f9317656c8afa2acf152a35f62508b1690b63
Author: zhouyh <[email protected]>
AuthorDate: Thu Oct 17 10:35:39 2024 +0800

    [Fix][Connector-V2]Oceanbase vector database is added as the source server 
(#7832)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/en/connector-v2/sink/Jdbc.md                  |   2 +-
 docs/en/connector-v2/source/Jdbc.md                |   2 +-
 docs/zh/connector-v2/sink/Jdbc.md                  |   2 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |   2 +-
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   |  64 +++++++++++--
 .../oceanbase/OceanBaseMySqlTypeConverter.java     |  20 ++++
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  |  16 +++-
 .../seatunnel/jdbc/JdbcOceanBaseITBase.java        |   2 +-
 .../seatunnel/jdbc/JdbcOceanBaseMilvusIT.java      | 106 +++++++++++++++++++--
 .../jdbc_oceanbase_source_and_milvus_sink.conf     |  43 +++++++++
 10 files changed, 232 insertions(+), 27 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 8df5f12bfa..9b86a27721 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -245,7 +245,7 @@ there are some reference value for params above.
 | Snowflake         | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com         | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                                              |
 | Vertica           | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
                               |
 | Kingbase          | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                                            |
-| OceanBase         | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
                              |
+| OceanBase         | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
                              |
 | xugu              | com.xugu.cloudjdbc.Driver                    | 
jdbc:xugu://localhost:5138                                         | /          
                                        | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                                              |
 | InterSystems IRIS | com.intersystems.jdbc.IRISDriver             | 
jdbc:IRIS://localhost:1972/%SYS                                    | /          
                                        | 
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
 |
 | opengauss         | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
                              |
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index f2e7486e24..2b5897cbae 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -133,7 +133,7 @@ there are some reference value for params above.
 | Redshift          | com.amazon.redshift.jdbc42.Driver                   | 
jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000         | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                                              |
 | Vertica           | com.vertica.jdbc.Driver                             | 
jdbc:vertica://localhost:5433                                          | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
                               |
 | Kingbase          | com.kingbase8.Driver                                | 
jdbc:kingbase8://localhost:54321/db_test                               | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                                            |
-| OceanBase         | com.oceanbase.jdbc.Driver                           | 
jdbc:oceanbase://localhost:2881                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
                              |
+| OceanBase         | com.oceanbase.jdbc.Driver                           | 
jdbc:oceanbase://localhost:2881                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
                              |
 | Hive              | org.apache.hive.jdbc.HiveDriver                     | 
jdbc:hive2://localhost:10000                                           | 
https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar
                                 |
 | xugu              | com.xugu.cloudjdbc.Driver                           | 
jdbc:xugu://localhost:5138                                             | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                                              |
 | InterSystems IRIS | com.intersystems.jdbc.IRISDriver                    | 
jdbc:IRIS://localhost:1972/%SYS                                        | 
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
 |
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
index 6a7253da61..4370af2002 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -235,7 +235,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
 | Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com         | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                     |
 | Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
      |
 | Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                   |
-| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
     |
+| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
     |
 | opengauss  | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
     |
 
 ## 示例
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index a6be0dd03b..60e324be4c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -49,7 +49,7 @@
         <postgis.jdbc.version>2.5.1</postgis.jdbc.version>
         <kingbase8.version>8.6.0</kingbase8.version>
         <hive.jdbc.version>3.1.3</hive.jdbc.version>
-        <oceanbase.jdbc.version>2.4.11</oceanbase.jdbc.version>
+        <oceanbase.jdbc.version>2.4.12</oceanbase.jdbc.version>
         <xugu.jdbc.version>12.2.0</xugu.jdbc.version>
         <iris.jdbc.version>3.0.0</iris.jdbc.version>
         <tikv.version>3.2.0</tikv.version>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 33aa2f8ccd..046a16f01b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -20,26 +20,32 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
@@ -197,12 +203,54 @@ public class OceanBaseMySqlCatalog extends 
AbstractJdbcCatalog {
 
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
-        Connection defaultConnection = getConnection(defaultUrl);
-        try (Statement statement = defaultConnection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sqlQuery)) {
-            ResultSetMetaData metaData = resultSet.getMetaData();
-            return CatalogUtils.getCatalogTable(
-                    metaData, new OceanBaseMySqlTypeMapper(typeConverter), 
sqlQuery);
+        try (Connection connection = getConnection(defaultUrl)) {
+            String tableName = null;
+            String databaseName = null;
+            String schemaName = null;
+            String catalogName = "jdbc_catalog";
+            TableSchema.Builder schemaBuilder = TableSchema.builder();
+
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet = statement.executeQuery(sqlQuery)) {
+                ResultSetMetaData metaData = resultSet.getMetaData();
+                tableName = metaData.getTableName(1);
+                databaseName = metaData.getCatalogName(1);
+                schemaName = metaData.getSchemaName(1);
+                catalogName = metaData.getCatalogName(1);
+            }
+            databaseName = StringUtils.defaultIfBlank(databaseName, null);
+            schemaName = StringUtils.defaultIfBlank(schemaName, null);
+
+            TablePath tablePath =
+                    StringUtils.isBlank(tableName)
+                            ? TablePath.DEFAULT
+                            : TablePath.of(databaseName, schemaName, 
tableName);
+
+            try (PreparedStatement ps =
+                            
connection.prepareStatement(getSelectColumnsSql(tablePath));
+                    ResultSet columnResultSet = ps.executeQuery();
+                    ResultSet primaryKeys =
+                            connection
+                                    .getMetaData()
+                                    .getPrimaryKeys(catalogName, schemaName, 
tableName)) {
+                while (primaryKeys.next()) {
+                    String primaryKeyColumnName = 
primaryKeys.getString("COLUMN_NAME");
+                    schemaBuilder.primaryKey(
+                            PrimaryKey.of(
+                                    primaryKeyColumnName,
+                                    
Collections.singletonList(primaryKeyColumnName)));
+                }
+                while (columnResultSet.next()) {
+                    schemaBuilder.column(buildColumn(columnResultSet));
+                }
+            }
+            return CatalogTable.of(
+                    TableIdentifier.of(catalogName, tablePath),
+                    schemaBuilder.build(),
+                    new HashMap<>(),
+                    new ArrayList<>(),
+                    "",
+                    catalogName);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index 4e9fa04d0d..78c8415a88 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.VectorType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -100,6 +101,9 @@ public class OceanBaseMySqlTypeConverter
     public static final long POWER_2_32 = (long) Math.pow(2, 32);
     public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
 
+    private static final String VECTOR_TYPE_NAME = "";
+    private static final String VECTOR_NAME = "VECTOR";
+
     @Override
     public String identifier() {
         return DatabaseIdentifier.OCENABASE;
@@ -289,6 +293,17 @@ public class OceanBaseMySqlTypeConverter
                 builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
                 builder.scale(typeDefine.getScale());
                 break;
+            case VECTOR_TYPE_NAME:
+                String columnType = typeDefine.getColumnType();
+                if (columnType.startsWith("vector(") && 
columnType.endsWith(")")) {
+                    Integer number =
+                            Integer.parseInt(
+                                    columnType.substring(
+                                            columnType.indexOf("(") + 1, 
columnType.indexOf(")")));
+                    builder.dataType(VectorType.VECTOR_FLOAT_TYPE);
+                    builder.scale(number);
+                }
+                break;
             default:
                 throw CommonError.convertToSeaTunnelTypeError(
                         DatabaseIdentifier.OCENABASE, mysqlDataType, 
typeDefine.getName());
@@ -501,6 +516,11 @@ public class OceanBaseMySqlTypeConverter
                     builder.columnType(MYSQL_DATETIME);
                 }
                 break;
+            case FLOAT_VECTOR:
+                builder.nativeType(VECTOR_NAME);
+                builder.columnType(String.format("%s(%s)", VECTOR_NAME, 
column.getScale()));
+                builder.dataType(VECTOR_NAME);
+                break;
             default:
                 throw CommonError.convertToConnectorTypeError(
                         DatabaseIdentifier.OCENABASE,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index a498879138..0a52e6a90b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -32,6 +32,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
@@ -89,12 +91,16 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
                     fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, 
resultSetIndex);
                     break;
                 case FLOAT_VECTOR:
-                    Object[] objects = (Object[]) rs.getObject(fieldIndex);
-                    Float[] arrays = new Float[objects.length];
-                    for (int i = 0; i < objects.length; i++) {
-                        arrays[i] = Float.parseFloat(objects[i].toString());
+                    String result = JdbcFieldTypeUtils.getString(rs, 
resultSetIndex);
+                    if (StringUtils.isNotBlank(result)) {
+                        result = result.replace("[", "").replace("]", "");
+                        String[] stringArray = result.split(",");
+                        Float[] arrays = new Float[stringArray.length];
+                        for (int i = 0; i < stringArray.length; i++) {
+                            arrays[i] = Float.parseFloat(stringArray[i]);
+                        }
+                        fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
                     }
-                    fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
                     break;
                 case DOUBLE:
                     fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, 
resultSetIndex);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index e9674c9f10..732dbb72b0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -87,6 +87,6 @@ public abstract class JdbcOceanBaseITBase extends 
AbstractJdbcIT {
 
     @Override
     String driverUrl() {
-        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar";;
+        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar";;
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
index a70852fc6d..e91eaed2de 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
@@ -28,6 +29,7 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
@@ -44,7 +46,6 @@ import org.testcontainers.milvus.MilvusContainer;
 import org.testcontainers.oceanbase.OceanBaseCEContainer;
 import org.testcontainers.utility.DockerLoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import io.milvus.client.MilvusServiceClient;
@@ -74,7 +75,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.DoubleStream;
 import java.util.stream.Stream;
 
 import static org.awaitility.Awaitility.given;
@@ -127,7 +131,7 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
             };
 
     String driverUrl() {
-        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar";;
+        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar";;
     }
 
     @BeforeAll
@@ -263,7 +267,8 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
     @TestTemplate
     public void testMilvusToOceanBase(TestContainer container) throws 
Exception {
         try {
-            Container.ExecResult execResult = 
container.executeJob(configFile().get(0));
+            Container.ExecResult execResult =
+                    
container.executeJob("/jdbc_milvus_source_and_oceanbase_sink.conf");
             Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
         } finally {
             clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
@@ -274,13 +279,72 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
     public void testFakeToOceanBase(TestContainer container)
             throws IOException, InterruptedException {
         try {
-            Container.ExecResult execResult = 
container.executeJob(configFile().get(1));
+            Container.ExecResult execResult =
+                    container.executeJob("/jdbc_fake_to_oceanbase_sink.conf");
             Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
         } finally {
             clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
         }
     }
 
+    @TestTemplate
+    public void testOceanBaseToMilvus(TestContainer container) throws 
Exception {
+        try {
+            initOceanBaseTestData();
+            Container.ExecResult execResult =
+                    
container.executeJob("/jdbc_oceanbase_source_and_milvus_sink.conf");
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        } finally {
+            clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+        }
+    }
+
+    private void initOceanBaseTestData() {
+        try (Statement statement = connection.createStatement()) {
+            statement.execute(insertTable());
+            connection.commit();
+        } catch (SQLException e) {
+            try {
+                connection.rollback();
+            } catch (SQLException exception) {
+                throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, exception);
+            }
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
+        }
+    }
+
+    public String insertTable() {
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        List<Object[]> fields =
+                testDataSet.getValue().stream()
+                        .map(SeaTunnelRow::getFields)
+                        .collect(Collectors.toList());
+
+        StringBuilder sqlBuilder = new StringBuilder();
+        sqlBuilder
+                .append("INSERT INTO ")
+                .append(buildTableInfoWithSchema(OCEANBASE_DATABASE, 
OCEANBASE_SINK))
+                .append(" (")
+                .append(columns)
+                .append(") VALUES ");
+
+        int valuesCount = fields.size();
+        for (int i = 0; i < valuesCount; i++) {
+            String fieldData = Arrays.toString(fields.get(i));
+            sqlBuilder.append("(").append(fieldData, 1, fieldData.length() - 
1).append(")");
+
+            if (i < valuesCount - 1) {
+                sqlBuilder.append(", ");
+            }
+        }
+        return sqlBuilder.toString();
+    }
+
     private void clearTable(String database, String schema, String table) {
         clearTable(database, table);
     }
@@ -322,11 +386,6 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
                 .build();
     }
 
-    List<String> configFile() {
-        return Lists.newArrayList(
-                "/jdbc_milvus_source_and_oceanbase_sink.conf", 
"/jdbc_fake_to_oceanbase_sink.conf");
-    }
-
     private void initializeJdbcConnection(String jdbcUrl)
             throws SQLException, InstantiationException, 
IllegalAccessException {
         Driver driver = (Driver) loadDriverClass().newInstance();
@@ -433,4 +492,33 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase 
implements TestResource
             return quoteIdentifier(table);
         }
     }
+
+    private String[] getFieldNames() {
+        return new String[] {
+            "book_id", "book_intro", "book_title",
+        };
+    }
+
+    private Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames = getFieldNames();
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i + 100,
+                                "'"
+                                        + DoubleStream.generate(() -> 
random.nextDouble() * 10)
+                                                .limit(VECTOR_DIM)
+                                                .mapToObj(num -> 
String.format("%.4f", num))
+                                                .collect(Collectors.joining(", 
", "[", "]"))
+                                        + "'",
+                                "\"" + "test" + i + "\"",
+                            });
+            rows.add(row);
+        }
+        return Pair.of(fieldNames, rows);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
new file mode 100644
index 0000000000..6875a6974c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+    driver = "com.oceanbase.jdbc.Driver"
+    user = "root@test"
+    password = ""
+    compatible_mode="mysql"
+    database = "seatunnel"
+    table = "simple_example"
+    query = "select * from simple_example"
+  }
+}
+
+sink {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+    database = "default"
+    collection="simple_example"
+  }
+}
\ No newline at end of file

Reply via email to