This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 258f931765 [Fix][Connector-V2]Oceanbase vector database is added as
the source server (#7832)
258f931765 is described below
commit 258f9317656c8afa2acf152a35f62508b1690b63
Author: zhouyh <[email protected]>
AuthorDate: Thu Oct 17 10:35:39 2024 +0800
[Fix][Connector-V2]Oceanbase vector database is added as the source server
(#7832)
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/connector-v2/sink/Jdbc.md | 2 +-
docs/en/connector-v2/source/Jdbc.md | 2 +-
docs/zh/connector-v2/sink/Jdbc.md | 2 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 2 +-
.../catalog/oceanbase/OceanBaseMySqlCatalog.java | 64 +++++++++++--
.../oceanbase/OceanBaseMySqlTypeConverter.java | 20 ++++
.../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 16 +++-
.../seatunnel/jdbc/JdbcOceanBaseITBase.java | 2 +-
.../seatunnel/jdbc/JdbcOceanBaseMilvusIT.java | 106 +++++++++++++++++++--
.../jdbc_oceanbase_source_and_milvus_sink.conf | 43 +++++++++
10 files changed, 232 insertions(+), 27 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 8df5f12bfa..9b86a27721 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -245,7 +245,7 @@ there are some reference value for params above.
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS | /
|
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index f2e7486e24..2b5897cbae 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -133,7 +133,7 @@ there are some reference value for params above.
| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 |
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test |
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 |
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 |
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
| Hive | org.apache.hive.jdbc.HiveDriver |
jdbc:hive2://localhost:10000 |
https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar
|
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 |
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS |
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index 6a7253da61..4370af2002 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -235,7 +235,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
## 示例
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index a6be0dd03b..60e324be4c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -49,7 +49,7 @@
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<kingbase8.version>8.6.0</kingbase8.version>
<hive.jdbc.version>3.1.3</hive.jdbc.version>
- <oceanbase.jdbc.version>2.4.11</oceanbase.jdbc.version>
+ <oceanbase.jdbc.version>2.4.12</oceanbase.jdbc.version>
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
<iris.jdbc.version>3.0.0</iris.jdbc.version>
<tikv.version>3.2.0</tikv.version>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 33aa2f8ccd..046a16f01b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -20,26 +20,32 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeConverter;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMySqlTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase.OceanBaseMysqlType;
+import org.apache.commons.lang.StringUtils;
+
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@@ -197,12 +203,54 @@ public class OceanBaseMySqlCatalog extends
AbstractJdbcCatalog {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
- Connection defaultConnection = getConnection(defaultUrl);
- try (Statement statement = defaultConnection.createStatement();
- ResultSet resultSet = statement.executeQuery(sqlQuery)) {
- ResultSetMetaData metaData = resultSet.getMetaData();
- return CatalogUtils.getCatalogTable(
- metaData, new OceanBaseMySqlTypeMapper(typeConverter),
sqlQuery);
+ try (Connection connection = getConnection(defaultUrl)) {
+ String tableName = null;
+ String databaseName = null;
+ String schemaName = null;
+ String catalogName = "jdbc_catalog";
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sqlQuery)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ tableName = metaData.getTableName(1);
+ databaseName = metaData.getCatalogName(1);
+ schemaName = metaData.getSchemaName(1);
+ catalogName = metaData.getCatalogName(1);
+ }
+ databaseName = StringUtils.defaultIfBlank(databaseName, null);
+ schemaName = StringUtils.defaultIfBlank(schemaName, null);
+
+ TablePath tablePath =
+ StringUtils.isBlank(tableName)
+ ? TablePath.DEFAULT
+ : TablePath.of(databaseName, schemaName,
tableName);
+
+ try (PreparedStatement ps =
+
connection.prepareStatement(getSelectColumnsSql(tablePath));
+ ResultSet columnResultSet = ps.executeQuery();
+ ResultSet primaryKeys =
+ connection
+ .getMetaData()
+ .getPrimaryKeys(catalogName, schemaName,
tableName)) {
+ while (primaryKeys.next()) {
+ String primaryKeyColumnName =
primaryKeys.getString("COLUMN_NAME");
+ schemaBuilder.primaryKey(
+ PrimaryKey.of(
+ primaryKeyColumnName,
+
Collections.singletonList(primaryKeyColumnName)));
+ }
+ while (columnResultSet.next()) {
+ schemaBuilder.column(buildColumn(columnResultSet));
+ }
+ }
+ return CatalogTable.of(
+ TableIdentifier.of(catalogName, tablePath),
+ schemaBuilder.build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "",
+ catalogName);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index 4e9fa04d0d..78c8415a88 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -100,6 +101,9 @@ public class OceanBaseMySqlTypeConverter
public static final long POWER_2_32 = (long) Math.pow(2, 32);
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
+ private static final String VECTOR_TYPE_NAME = "";
+ private static final String VECTOR_NAME = "VECTOR";
+
@Override
public String identifier() {
return DatabaseIdentifier.OCENABASE;
@@ -289,6 +293,17 @@ public class OceanBaseMySqlTypeConverter
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(typeDefine.getScale());
break;
+ case VECTOR_TYPE_NAME:
+ String columnType = typeDefine.getColumnType();
+ if (columnType.startsWith("vector(") &&
columnType.endsWith(")")) {
+ Integer number =
+ Integer.parseInt(
+ columnType.substring(
+ columnType.indexOf("(") + 1,
columnType.indexOf(")")));
+ builder.dataType(VectorType.VECTOR_FLOAT_TYPE);
+ builder.scale(number);
+ }
+ break;
default:
throw CommonError.convertToSeaTunnelTypeError(
DatabaseIdentifier.OCENABASE, mysqlDataType,
typeDefine.getName());
@@ -501,6 +516,11 @@ public class OceanBaseMySqlTypeConverter
builder.columnType(MYSQL_DATETIME);
}
break;
+ case FLOAT_VECTOR:
+ builder.nativeType(VECTOR_NAME);
+ builder.columnType(String.format("%s(%s)", VECTOR_NAME,
column.getScale()));
+ builder.dataType(VECTOR_NAME);
+ break;
default:
throw CommonError.convertToConnectorTypeError(
DatabaseIdentifier.OCENABASE,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index a498879138..0a52e6a90b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -32,6 +32,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
@@ -89,12 +91,16 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
break;
case FLOAT_VECTOR:
- Object[] objects = (Object[]) rs.getObject(fieldIndex);
- Float[] arrays = new Float[objects.length];
- for (int i = 0; i < objects.length; i++) {
- arrays[i] = Float.parseFloat(objects[i].toString());
+ String result = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
+ if (StringUtils.isNotBlank(result)) {
+ result = result.replace("[", "").replace("]", "");
+ String[] stringArray = result.split(",");
+ Float[] arrays = new Float[stringArray.length];
+ for (int i = 0; i < stringArray.length; i++) {
+ arrays[i] = Float.parseFloat(stringArray[i]);
+ }
+ fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
}
- fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
break;
case DOUBLE:
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index e9674c9f10..732dbb72b0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -87,6 +87,6 @@ public abstract class JdbcOceanBaseITBase extends
AbstractJdbcIT {
@Override
String driverUrl() {
- return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar";
+ return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar";
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
index a70852fc6d..e91eaed2de 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
@@ -28,6 +29,7 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
@@ -44,7 +46,6 @@ import org.testcontainers.milvus.MilvusContainer;
import org.testcontainers.oceanbase.OceanBaseCEContainer;
import org.testcontainers.utility.DockerLoggerFactory;
-import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.client.MilvusServiceClient;
@@ -74,7 +75,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.DoubleStream;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.given;
@@ -127,7 +131,7 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase
implements TestResource
};
String driverUrl() {
- return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar";
+ return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar";
}
@BeforeAll
@@ -263,7 +267,8 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase
implements TestResource
@TestTemplate
public void testMilvusToOceanBase(TestContainer container) throws
Exception {
try {
- Container.ExecResult execResult =
container.executeJob(configFile().get(0));
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_milvus_source_and_oceanbase_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
} finally {
clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
@@ -274,13 +279,72 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase
implements TestResource
public void testFakeToOceanBase(TestContainer container)
throws IOException, InterruptedException {
try {
- Container.ExecResult execResult =
container.executeJob(configFile().get(1));
+ Container.ExecResult execResult =
+ container.executeJob("/jdbc_fake_to_oceanbase_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
} finally {
clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
}
}
+ @TestTemplate
+ public void testOceanBaseToMilvus(TestContainer container) throws
Exception {
+ try {
+ initOceanBaseTestData();
+ Container.ExecResult execResult =
+
container.executeJob("/jdbc_oceanbase_source_and_milvus_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
+ }
+
+ private void initOceanBaseTestData() {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(insertTable());
+ connection.commit();
+ } catch (SQLException e) {
+ try {
+ connection.rollback();
+ } catch (SQLException exception) {
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, exception);
+ }
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
+ }
+ }
+
+ public String insertTable() {
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+ String columns =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ List<Object[]> fields =
+ testDataSet.getValue().stream()
+ .map(SeaTunnelRow::getFields)
+ .collect(Collectors.toList());
+
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder
+ .append("INSERT INTO ")
+ .append(buildTableInfoWithSchema(OCEANBASE_DATABASE,
OCEANBASE_SINK))
+ .append(" (")
+ .append(columns)
+ .append(") VALUES ");
+
+ int valuesCount = fields.size();
+ for (int i = 0; i < valuesCount; i++) {
+ String fieldData = Arrays.toString(fields.get(i));
+ sqlBuilder.append("(").append(fieldData, 1, fieldData.length() -
1).append(")");
+
+ if (i < valuesCount - 1) {
+ sqlBuilder.append(", ");
+ }
+ }
+ return sqlBuilder.toString();
+ }
+
private void clearTable(String database, String schema, String table) {
clearTable(database, table);
}
@@ -322,11 +386,6 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase
implements TestResource
.build();
}
- List<String> configFile() {
- return Lists.newArrayList(
- "/jdbc_milvus_source_and_oceanbase_sink.conf",
"/jdbc_fake_to_oceanbase_sink.conf");
- }
-
private void initializeJdbcConnection(String jdbcUrl)
throws SQLException, InstantiationException,
IllegalAccessException {
Driver driver = (Driver) loadDriverClass().newInstance();
@@ -433,4 +492,33 @@ public class JdbcOceanBaseMilvusIT extends TestSuiteBase
implements TestResource
return quoteIdentifier(table);
}
}
+
+ private String[] getFieldNames() {
+ return new String[] {
+ "book_id", "book_intro", "book_title",
+ };
+ }
+
+ private Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames = getFieldNames();
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i + 100,
+ "'"
+ + DoubleStream.generate(() ->
random.nextDouble() * 10)
+ .limit(VECTOR_DIM)
+ .mapToObj(num ->
String.format("%.4f", num))
+ .collect(Collectors.joining(",
", "[", "]"))
+ + "'",
+ "\"" + "test" + i + "\"",
+ });
+ rows.add(row);
+ }
+ return Pair.of(fieldNames, rows);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
new file mode 100644
index 0000000000..6875a6974c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_source_and_milvus_sink.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root@test"
+ password = ""
+ compatible_mode="mysql"
+ database = "seatunnel"
+ table = "simple_example"
+ query = "select * from simple_example"
+ }
+}
+
+sink {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database = "default"
+ collection="simple_example"
+ }
+}
\ No newline at end of file