This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a6b188d552 [Feature][Connector-V2][OceanBase] Support vector types on
OceanBase (#7375)
a6b188d552 is described below
commit a6b188d5528ba914c289f65bb910621659390fe3
Author: zhouyh <[email protected]>
AuthorDate: Wed Aug 21 22:24:33 2024 +0800
[Feature][Connector-V2][OceanBase] Support vector types on OceanBase (#7375)
---
.../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 206 ++++++++++
.../connector-jdbc-e2e-part-2/pom.xml | 22 +-
.../seatunnel/jdbc/JdbcOceanBaseMilvusIT.java | 435 +++++++++++++++++++++
.../resources/jdbc_fake_to_oceanbase_sink.conf | 69 ++++
.../jdbc_milvus_source_and_oceanbase_sink.conf | 47 +++
.../container/seatunnel/SeaTunnelContainer.java | 4 +-
6 files changed, 780 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 2033518108..2092a54f98 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -17,14 +17,32 @@
package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
+import java.math.BigDecimal;
+import java.sql.Date;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
@@ -39,4 +57,192 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
statement.setTimestamp(
index,
java.sql.Timestamp.valueOf(LocalDateTime.of(LocalDate.now(), time)));
}
+
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema)
throws SQLException {
+ SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
+ Object[] fields = new Object[typeInfo.getTotalFields()];
+ for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields();
fieldIndex++) {
+ SeaTunnelDataType<?> seaTunnelDataType =
typeInfo.getFieldType(fieldIndex);
+ String fieldName = typeInfo.getFieldName(fieldIndex);
+ int resultSetIndex = fieldIndex + 1;
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
+ break;
+ case BOOLEAN:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs,
resultSetIndex);
+ break;
+ case TINYINT:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs,
resultSetIndex);
+ break;
+ case SMALLINT:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs,
resultSetIndex);
+ break;
+ case INT:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs,
resultSetIndex);
+ break;
+ case BIGINT:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs,
resultSetIndex);
+ break;
+ case FLOAT:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
+ break;
+ case FLOAT_VECTOR:
+ List<Float> vector = new ArrayList<>();
+ for (Object o : (Object[]) rs.getObject(fieldIndex)) {
+ vector.add(Float.parseFloat(o.toString()));
+ }
+ fields[fieldIndex] = vector;
+ case DOUBLE:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
+ break;
+ case DECIMAL:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs,
resultSetIndex);
+ break;
+ case DATE:
+ Date sqlDate = JdbcFieldTypeUtils.getDate(rs,
resultSetIndex);
+ fields[fieldIndex] =
+ Optional.ofNullable(sqlDate).map(e ->
e.toLocalDate()).orElse(null);
+ break;
+ case TIME:
+ fields[fieldIndex] = readTime(rs, resultSetIndex);
+ break;
+ case TIMESTAMP:
+ Timestamp sqlTimestamp =
JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
+ fields[fieldIndex] =
+ Optional.ofNullable(sqlTimestamp)
+ .map(e -> e.toLocalDateTime())
+ .orElse(null);
+ break;
+ case BYTES:
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
+ break;
+ case NULL:
+ fields[fieldIndex] = null;
+ break;
+ case ARRAY:
+ fields[fieldIndex] =
+ convertToArray(rs, resultSetIndex,
seaTunnelDataType, fieldName);
+ break;
+ case MAP:
+ case ROW:
+ default:
+ throw CommonError.unsupportedDataType(
+ converterName(),
seaTunnelDataType.getSqlType().toString(), fieldName);
+ }
+ }
+ return new SeaTunnelRow(fields);
+ }
+
+ @Override
+ public PreparedStatement toExternal(
+ TableSchema tableSchema, SeaTunnelRow row, PreparedStatement
statement)
+ throws SQLException {
+ SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
+ for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields();
fieldIndex++) {
+ try {
+ SeaTunnelDataType<?> seaTunnelDataType =
rowType.getFieldType(fieldIndex);
+ int statementIndex = fieldIndex + 1;
+ Object fieldValue = row.getField(fieldIndex);
+ if (fieldValue == null) {
+ statement.setObject(statementIndex, null);
+ continue;
+ }
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ statement.setString(statementIndex, (String)
row.getField(fieldIndex));
+ break;
+ case BOOLEAN:
+ statement.setBoolean(statementIndex, (Boolean)
row.getField(fieldIndex));
+ break;
+ case TINYINT:
+ statement.setByte(statementIndex, (Byte)
row.getField(fieldIndex));
+ break;
+ case SMALLINT:
+ statement.setShort(statementIndex, (Short)
row.getField(fieldIndex));
+ break;
+ case INT:
+ statement.setInt(statementIndex, (Integer)
row.getField(fieldIndex));
+ break;
+ case BIGINT:
+ statement.setLong(statementIndex, (Long)
row.getField(fieldIndex));
+ break;
+ case FLOAT:
+ statement.setFloat(statementIndex, (Float)
row.getField(fieldIndex));
+ break;
+ case FLOAT_VECTOR:
+ if (row.getField(fieldIndex) instanceof Float[]) {
+ Float[] floatArray = (Float[])
row.getField(fieldIndex);
+ StringBuilder vector = new StringBuilder();
+ vector.append("[");
+ for (Float aFloat : floatArray) {
+ vector.append(aFloat).append(", ");
+ }
+ if (vector.length() > 0) {
+ vector.setLength(vector.length() - 2);
+ }
+ vector.append("]");
+ statement.setString(statementIndex,
vector.toString());
+ }
+ break;
+ case DOUBLE:
+ statement.setDouble(statementIndex, (Double)
row.getField(fieldIndex));
+ break;
+ case DECIMAL:
+ statement.setBigDecimal(
+ statementIndex, (BigDecimal)
row.getField(fieldIndex));
+ break;
+ case DATE:
+ LocalDate localDate = (LocalDate)
row.getField(fieldIndex);
+ statement.setDate(statementIndex,
java.sql.Date.valueOf(localDate));
+ break;
+ case TIME:
+ writeTime(statement, statementIndex, (LocalTime)
row.getField(fieldIndex));
+ break;
+ case TIMESTAMP:
+ LocalDateTime localDateTime = (LocalDateTime)
row.getField(fieldIndex);
+ statement.setTimestamp(
+ statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
+ break;
+ case BYTES:
+ statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
+ break;
+ case NULL:
+ statement.setNull(statementIndex, java.sql.Types.NULL);
+ break;
+ case ARRAY:
+ SeaTunnelDataType elementType =
+ ((ArrayType)
seaTunnelDataType).getElementType();
+ Object[] array = (Object[]) row.getField(fieldIndex);
+ if (array == null) {
+ statement.setNull(statementIndex,
java.sql.Types.ARRAY);
+ break;
+ }
+ if (SqlType.TINYINT.equals(elementType.getSqlType())) {
+ Short[] shortArray = new Short[array.length];
+ for (int i = 0; i < array.length; i++) {
+ shortArray[i] =
Short.valueOf(array[i].toString());
+ }
+ statement.setObject(statementIndex, shortArray);
+ } else {
+ statement.setObject(statementIndex, array);
+ }
+ break;
+ case MAP:
+ case ROW:
+ default:
+ throw new JdbcConnectorException(
+
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unexpected value: " + seaTunnelDataType);
+ }
+ } catch (Exception e) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
+ "error field:" + rowType.getFieldNames()[fieldIndex],
+ e);
+ }
+ }
+ return statement;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
index d8f631d3e8..d5f689f973 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
@@ -24,7 +24,9 @@
<artifactId>connector-jdbc-e2e-part-2</artifactId>
<name>SeaTunnel : E2E : Connector V2 : Jdbc : Part 2</name>
-
+ <properties>
+ <testcontainer.milvus.version>1.19.8</testcontainer.milvus.version>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -33,7 +35,23 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-milvus</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>milvus</artifactId>
+ <version>${testcontainer.milvus.version}</version>
+ </dependency>
<!-- drivers -->
<dependency>
<groupId>com.aliyun.phoenix</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
new file mode 100644
index 0000000000..36e66ca9d8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.milvus.MilvusContainer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.MutationResult;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK not support adapt")
+public class JdbcOceanBaseMilvusIT extends TestSuiteBase implements
TestResource {
+
+ private static final String IMAGE = "oceanbase/oceanbase-ce:vector";
+
+ private static final String HOSTNAME = "e2e_oceanbase_vector";
+ private static final int PORT = 2881;
+ private static final String USERNAME = "root@test";
+ private static final String PASSWORD = "";
+ private static final String OCEANBASE_DATABASE = "seatunnel";
+ private GenericContainer<?> dbServer;
+ private Connection connection;
+ private JdbcCase jdbcCase;
+ private static final String OCEANBASE_SINK = "simple_example";
+
+ private static final String HOST = "HOST";
+ private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://"
+ HOST + ":%s/%s";
+ private static final String OCEANBASE_DRIVER_CLASS =
"com.oceanbase.jdbc.Driver";
+
+ private static final String MILVUS_HOST = "milvus-e2e";
+ private static final String MILVUS_IMAGE =
"milvusdb/milvus:2.4-20240711-7e2a9d6b";
+ private static final String TOKEN = "root:Milvus";
+ private MilvusContainer container;
+ private MilvusServiceClient milvusClient;
+ private static final String COLLECTION_NAME = "simple_example";
+ private static final String ID_FIELD = "book_id";
+ private static final String VECTOR_FIELD = "book_intro";
+ private static final String TITLE_FIELD = "book_title";
+ private static final Integer VECTOR_DIM = 4;
+ private static final Gson gson = new Gson();
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ dbServer = initOceanbaseContainer();
+
+ Startables.deepStart(Stream.of(dbServer)).join();
+ jdbcCase = getJdbcCase();
+ given().ignoreExceptions()
+ .await()
+ .atMost(360, TimeUnit.SECONDS)
+ .untilAsserted(() ->
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+
+ createSchemaIfNeeded();
+ createNeededTables();
+ this.container =
+ new MilvusContainer(MILVUS_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MILVUS_HOST);
+ Startables.deepStart(Stream.of(this.container)).join();
+ log.info("Milvus host is {}", container.getHost());
+ log.info("Milvus container started");
+ Awaitility.given().ignoreExceptions().await().atMost(720L,
TimeUnit.SECONDS);
+ this.initMilvus();
+ this.initSourceData();
+ }
+
+ private void initMilvus()
+ throws SQLException, ClassNotFoundException,
InstantiationException,
+ IllegalAccessException {
+ milvusClient =
+ new MilvusServiceClient(
+ ConnectParam.newBuilder()
+ .withUri(this.container.getEndpoint())
+ .withToken(TOKEN)
+ .build());
+ }
+
+ private void initSourceData() {
+ // Define fields
+ List<FieldType> fieldsSchema =
+ Arrays.asList(
+ FieldType.newBuilder()
+ .withName(ID_FIELD)
+ .withDataType(DataType.Int64)
+ .withPrimaryKey(true)
+ .withAutoID(false)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD)
+ .withDataType(DataType.FloatVector)
+ .withDimension(VECTOR_DIM)
+ .build(),
+ FieldType.newBuilder()
+ .withName(TITLE_FIELD)
+ .withDataType(DataType.VarChar)
+ .withMaxLength(64)
+ .build());
+
+ // Create the collection with 3 fields
+ R<RpcStatus> ret =
+ milvusClient.createCollection(
+ CreateCollectionParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withFieldTypes(fieldsSchema)
+ .build());
+ if (ret.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException("Failed to create collection! Error: "
+ ret.getMessage());
+ }
+
+ // Specify an index type on the vector field.
+ ret =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withFieldName(VECTOR_FIELD)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (ret.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+
+ // Call loadCollection() to enable automatically loading data into
memory for searching
+ milvusClient.loadCollection(
+
LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build());
+
+ log.info("Collection created");
+
+ // Insert 10 records into the collection
+ List<JsonObject> rows = new ArrayList<>();
+ for (long i = 1L; i <= 10; ++i) {
+
+ JsonObject row = new JsonObject();
+ row.add(ID_FIELD, gson.toJsonTree(i));
+ List<Float> vector = Arrays.asList((float) i, (float) i, (float)
i, (float) i);
+ row.add(VECTOR_FIELD, gson.toJsonTree(vector));
+ row.addProperty(TITLE_FIELD, "Tom and Jerry " + i);
+ rows.add(row);
+ }
+
+ R<MutationResult> insertRet =
+ milvusClient.insert(
+ InsertParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withRows(rows)
+ .build());
+ if (insertRet.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException("Failed to insert! Error: " +
insertRet.getMessage());
+ }
+ log.info("Milvus test data created");
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ if (milvusClient != null) {
+ milvusClient.close();
+ }
+ if (dbServer != null) {
+ dbServer.close();
+ }
+ if (container != null) {
+ container.close();
+ }
+ }
+
+ @TestTemplate
+ public void testMilvusToOceanBase(TestContainer container) throws
Exception {
+ try {
+ Container.ExecResult execResult =
container.executeJob(configFile().get(0));
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
+ }
+
+ @TestTemplate
+ public void testFakeToOceanBase(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
container.executeJob(configFile().get(1));
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSinkTable());
+ }
+ }
+
+ private void clearTable(String database, String schema, String table) {
+ clearTable(database, table);
+ }
+
+ public void clearTable(String schema, String table) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("TRUNCATE TABLE " +
buildTableInfoWithSchema(schema, table));
+ connection.commit();
+ } catch (SQLException e) {
+ try {
+ connection.rollback();
+ } catch (SQLException exception) {
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, exception);
+ }
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
+ }
+ }
+
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl =
+ String.format(OCEANBASE_JDBC_TEMPLATE,
dbServer.getMappedPort(PORT), "test");
+
+ return JdbcCase.builder()
+ .dockerImage(IMAGE)
+ .networkAliases(HOSTNAME)
+ .containerEnv(containerEnv)
+ .driverClass(OCEANBASE_DRIVER_CLASS)
+ .host(HOST)
+ .port(PORT)
+ .localPort(dbServer.getMappedPort(PORT))
+ .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .password(PASSWORD)
+ .database(OCEANBASE_DATABASE)
+ .sinkTable(OCEANBASE_SINK)
+ .createSql(createSqlTemplate())
+ .build();
+ }
+
+ List<String> configFile() {
+ return Lists.newArrayList(
+ "/jdbc_milvus_source_and_oceanbase_sink.conf",
"/jdbc_fake_to_oceanbase_sink.conf");
+ }
+
+ private void initializeJdbcConnection(String jdbcUrl)
+ throws SQLException, InstantiationException,
IllegalAccessException {
+ Driver driver = (Driver) loadDriverClass().newInstance();
+ Properties props = new Properties();
+
+ if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+ props.put("user", jdbcCase.getUserName());
+ }
+
+ if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+ props.put("password", jdbcCase.getPassword());
+ }
+
+ if (dbServer != null) {
+ jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+ }
+
+ this.connection = driver.connect(jdbcUrl, props);
+ connection.setAutoCommit(false);
+ }
+
+ private Class<?> loadDriverClass() {
+ try {
+ return Class.forName(jdbcCase.getDriverClass());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to load driver class: " +
jdbcCase.getDriverClass(), e);
+ }
+ }
+
+ private void createSchemaIfNeeded() {
+ String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+ try {
+ connection.prepareStatement(sql).executeUpdate();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(
+ JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql
" + sql, e);
+ }
+ log.info("oceanbase schema created,sql is" + sql);
+ }
+
+ String createSqlTemplate() {
+ return "CREATE TABLE IF NOT EXISTS %s\n"
+ + "(\n"
+ + "book_id varchar(20) NOT NULL,\n"
+ + "book_intro vector(4) DEFAULT NULL,\n"
+ + "book_title varchar(64) DEFAULT NULL,\n"
+ + "primary key (book_id)\n"
+ + ");";
+ }
+
+ GenericContainer<?> initOceanbaseContainer() {
+ return new GenericContainer<>(IMAGE)
+ .withEnv("MODE", "slim")
+ .withEnv("OB_DATAFILE_SIZE", "2G")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOSTNAME)
+ .withExposedPorts(PORT)
+ .withImagePullPolicy(PullPolicy.alwaysPull())
+ .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
+ }
+
+ private void createNeededTables() {
+ try (Statement statement = connection.createStatement()) {
+ String createTemplate = jdbcCase.getCreateSql();
+
+ if (!jdbcCase.isUseSaveModeCreateTable()) {
+ if (jdbcCase.getSinkCreateSql() != null) {
+ createTemplate = jdbcCase.getSinkCreateSql();
+ }
+ String createSink =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSinkTable()));
+ statement.execute(createSink);
+ log.info("oceanbase table created,sql is" + createSink);
+ }
+
+ connection.commit();
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+ }
+ log.info("oceanbase table created success!");
+ }
+
+ private String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(database, table);
+ }
+
+ public String quoteIdentifier(String field) {
+ return "`" + field + "`";
+ }
+
+ public String buildTableInfoWithSchema(String schema, String table) {
+ if (StringUtils.isNotBlank(schema)) {
+ return quoteIdentifier(schema) + "." + quoteIdentifier(table);
+ } else {
+ return quoteIdentifier(table);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
new file mode 100644
index 0000000000..4a5ae17cd0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ vector.dimension= 4
+ schema = {
+ table = "simple_example_1"
+ columns = [
+ {
+ name = book_id
+ type = bigint
+ nullable = false
+ defaultValue = 0
+ comment = "primary key id"
+ },
+ {
+ name = book_intro
+ type = float_vector
+ columnScale =4
+ comment = "vector"
+ },
+ {
+ name = book_title
+ type = string
+ nullable = true
+ comment = "topic"
+ }
+ ]
+ primaryKey {
+ name = book_id
+ columnNames = [book_id]
+ }
+ }
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root@test"
+ password = ""
+ generate_sink_sql =true
+ compatible_mode="mysql"
+ database = "seatunnel"
+ table = "simple_example"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
new file mode 100644
index 0000000000..9d7ad806f4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database = "default"
+ collection="simple_example"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root@test"
+ password = ""
+ generate_sink_sql =true
+ compatible_mode="mysql"
+ database = "seatunnel"
+ table = "simple_example"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 802b1c32fb..7c2d6dda70 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -433,7 +433,9 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| threadName.contains("Timer for 's3a-file-system' metrics
system")
|| threadName.startsWith("MutableQuantiles-")
// JDBC Hana driver
- || threadName.startsWith("Thread-");
+ || threadName.startsWith("Thread-")
+ // JNA Cleaner
+ || threadName.startsWith("JNA Cleaner");
}
@Override