This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cd4b7ff7d2 [Feature][Connector-V2][Jdbc] Add OceanBase catalog (#5439)
cd4b7ff7d2 is described below

commit cd4b7ff7d2d10c930cb2c14cef0bf1e6d282c49a
Author: He Wang <[email protected]>
AuthorDate: Mon Oct 16 19:08:40 2023 +0800

    [Feature][Connector-V2][Jdbc] Add OceanBase catalog (#5439)
    
    * feat: add oceanbase jdbc catalog
    
    * remove oceanbase-client and disable it cases
    
    * update EXCLUDE_SCHEMAS and skip database check in OceanBase Oracle mode
---
 .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java |   8 +
 .../catalog/oceanbase/OceanBaseCatalogFactory.java |  83 ++++++++++
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   |  39 +++++
 .../catalog/oceanbase/OceanBaseOracleCatalog.java  |  99 ++++++++++++
 .../jdbc/catalog/oracle/OracleCatalog.java         |   2 +-
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |   3 +-
 .../seatunnel/jdbc/JdbcOceanBaseITBase.java        |  73 +--------
 .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java       | 104 ++++++++----
 .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java      | 179 ++++++++++++++++-----
 .../jdbc_oceanbase_mysql_source_and_sink.conf      |   4 +-
 .../jdbc_oceanbase_oracle_source_and_sink.conf     |  12 +-
 11 files changed, 466 insertions(+), 140 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index 712eefacb8..fc58a45c28 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -50,6 +50,14 @@ public interface JdbcCatalogOptions {
                     .withDescription(
                             "for databases that support the schema parameter, 
give it priority.");
 
+    Option<String> COMPATIBLE_MODE =
+            Options.key("compatibleMode")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The compatible mode of database, required when 
the database supports multiple compatible modes. "
+                                    + "For example, when using OceanBase 
database, you need to set it to 'mysql' or 'oracle'.");
+
     OptionRule.Builder BASE_RULE =
             OptionRule.builder().required(BASE_URL).required(USERNAME, 
PASSWORD).optional(SCHEMA);
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
new file mode 100644
index 0000000000..aa8b016d08
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class OceanBaseCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "OceanBase";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+        Preconditions.checkArgument(
+                StringUtils.isNoneBlank(urlWithDatabase),
+                "Miss config <base-url>! Please check your config.");
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+        Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+        if (!defaultDatabase.isPresent()) {
+            throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+        }
+
+        String compatibleMode = 
options.get(JdbcCatalogOptions.COMPATIBLE_MODE);
+        Preconditions.checkArgument(
+                StringUtils.isNoneBlank(compatibleMode),
+                "Miss config <compatibleMode>! Please check your config.");
+
+        if ("oracle".equalsIgnoreCase(compatibleMode.trim())) {
+            return new OceanBaseOracleCatalog(
+                    catalogName,
+                    options.get(JdbcCatalogOptions.USERNAME),
+                    options.get(JdbcCatalogOptions.PASSWORD),
+                    urlInfo,
+                    options.get(JdbcCatalogOptions.SCHEMA));
+        }
+        return new OceanBaseMySqlCatalog(
+                catalogName,
+                options.get(JdbcCatalogOptions.USERNAME),
+                options.get(JdbcCatalogOptions.PASSWORD),
+                urlInfo);
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return 
JdbcCatalogOptions.BASE_RULE.required(JdbcCatalogOptions.COMPATIBLE_MODE).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
new file mode 100644
index 0000000000..58cdb5c413
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+
+public class OceanBaseMySqlCatalog extends MySqlCatalog {
+
+    static {
+        SYS_DATABASES.clear();
+        SYS_DATABASES.add("information_schema");
+        SYS_DATABASES.add("mysql");
+        SYS_DATABASES.add("oceanbase");
+        SYS_DATABASES.add("LBACSYS");
+        SYS_DATABASES.add("ORAAUDITOR");
+        SYS_DATABASES.add("SYS");
+    }
+
+    public OceanBaseMySqlCatalog(
+            String catalogName, String username, String pwd, 
JdbcUrlUtil.UrlInfo urlInfo) {
+        super(catalogName, username, pwd, urlInfo);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
new file mode 100644
index 0000000000..b4ece7db9c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class OceanBaseOracleCatalog extends OracleCatalog {
+
+    static {
+        EXCLUDED_SCHEMAS =
+                Collections.unmodifiableList(
+                        Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", 
"SYS"));
+    }
+
+    public OceanBaseOracleCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String defaultSchema) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema);
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        String dbUrl = getUrlFromDatabaseName(databaseName);
+        try {
+            return queryString(dbUrl, getListTableSql(databaseName), 
this::getTableName);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return 
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+
+        if (defaultSchema.isPresent()) {
+            tablePath =
+                    new TablePath(
+                            tablePath.getDatabaseName(),
+                            defaultSchema.get(),
+                            tablePath.getTableName());
+        }
+
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new TableAlreadyExistException(catalogName, tablePath);
+        }
+
+        createTableInternal(tablePath, table);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 570f387411..f72cf2a75d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -54,7 +54,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
     private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR =
             new OracleDataTypeConvertor();
 
-    private static final List<String> EXCLUDED_SCHEMAS =
+    protected static List<String> EXCLUDED_SCHEMAS =
             Collections.unmodifiableList(
                     Arrays.asList(
                             "APPQOSSYS",
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 2738f5ccc3..3798c64c17 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -229,10 +229,11 @@ public abstract class AbstractJdbcIT extends 
TestSuiteBase implements TestResour
     @Override
     public void startUp() {
         dbServer = 
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
-        jdbcCase = getJdbcCase();
 
         Startables.deepStart(Stream.of(dbServer)).join();
 
+        jdbcCase = getJdbcCase();
+
         given().ignoreExceptions()
                 .await()
                 .atMost(360, TimeUnit.SECONDS)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index b8202e697a..50177ef1a8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -17,11 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.junit.jupiter.api.Assertions;
 import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
 
@@ -29,29 +24,18 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.sql.ResultSet;
 import java.sql.Statement;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT {
 
-    private static final String OCEANBASE_DATABASE = "seatunnel";
-    private static final String OCEANBASE_SOURCE = "source";
-    private static final String OCEANBASE_SINK = "sink";
-
-    private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" 
+ HOST + ":%s";
-    private static final String OCEANBASE_DRIVER_CLASS = 
"com.oceanbase.jdbc.Driver";
-
-    abstract String imageName();
+    protected static final String OCEANBASE_SOURCE = "source";
+    protected static final String OCEANBASE_SINK = "sink";
 
-    abstract String host();
+    protected static final String OCEANBASE_CATALOG_TABLE = "catalog_table";
 
-    abstract int port();
-
-    abstract String username();
-
-    abstract String password();
+    protected static final String OCEANBASE_JDBC_TEMPLATE = 
"jdbc:oceanbase://" + HOST + ":%s/%s";
+    protected static final String OCEANBASE_DRIVER_CLASS = 
"com.oceanbase.jdbc.Driver";
 
     abstract List<String> configFile();
 
@@ -59,44 +43,14 @@ public abstract class JdbcOceanBaseITBase extends 
AbstractJdbcIT {
 
     abstract String[] getFieldNames();
 
-    @Override
-    JdbcCase getJdbcCase() {
-        Map<String, String> containerEnv = new HashMap<>();
-        String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port());
-        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
-        String[] fieldNames = testDataSet.getKey();
-
-        String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, 
fieldNames);
-
-        return JdbcCase.builder()
-                .dockerImage(imageName())
-                .networkAliases(host())
-                .containerEnv(containerEnv)
-                .driverClass(OCEANBASE_DRIVER_CLASS)
-                .host(HOST)
-                .port(port())
-                .localPort(port())
-                .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
-                .jdbcUrl(jdbcUrl)
-                .userName(username())
-                .password(password())
-                .database(OCEANBASE_DATABASE)
-                .sourceTable(OCEANBASE_SOURCE)
-                .sinkTable(OCEANBASE_SINK)
-                .createSql(createSqlTemplate())
-                .configFile(configFile())
-                .insertSql(insertSql)
-                .testData(testDataSet)
-                .build();
-    }
+    abstract String getFullTableName(String tableName);
 
     @Override
     void compareResult() {
         String sourceSql =
-                String.format(
-                        "select * from %s.%s order by 1", OCEANBASE_DATABASE, 
OCEANBASE_SOURCE);
+                String.format("select * from %s order by 1", 
getFullTableName(OCEANBASE_SOURCE));
         String sinkSql =
-                String.format("select * from %s.%s order by 1", 
OCEANBASE_DATABASE, OCEANBASE_SINK);
+                String.format("select * from %s order by 1", 
getFullTableName(OCEANBASE_SINK));
         try {
             Statement sourceStatement = connection.createStatement();
             Statement sinkStatement = connection.createStatement();
@@ -133,15 +87,4 @@ public abstract class JdbcOceanBaseITBase extends 
AbstractJdbcIT {
     String driverUrl() {
         return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";;
     }
-
-    @Override
-    protected void createSchemaIfNeeded() {
-        String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
-        try {
-            connection.prepareStatement(sql).executeUpdate();
-        } catch (Exception e) {
-            throw new SeaTunnelRuntimeException(
-                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
-        }
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
index 548fecaee6..b4e0ff1b48 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -18,6 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseMySqlCatalog;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -36,39 +39,70 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Disabled("Disabled due to insufficient hardware resources in the CI 
environment")
 public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase {
 
-    @Override
-    String imageName() {
-        return "oceanbase/oceanbase-ce:4.0.0.0";
-    }
+    private static final String IMAGE = "oceanbase/oceanbase-ce:4.1.0.0";
 
-    @Override
-    String host() {
-        return "e2e_oceanbase_mysql";
-    }
+    private static final String HOSTNAME = "e2e_oceanbase_mysql";
+    private static final int PORT = 2881;
+    private static final String USERNAME = "root@test";
+    private static final String PASSWORD = "";
+    private static final String OCEANBASE_DATABASE = "seatunnel";
+    private static final String OCEANBASE_CATALOG_DATABASE = 
"seatunnel_catalog";
 
     @Override
-    int port() {
-        return 2881;
+    List<String> configFile() {
+        return 
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
     }
 
     @Override
-    String username() {
-        return "root";
-    }
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl =
+                String.format(OCEANBASE_JDBC_TEMPLATE, 
dbServer.getMappedPort(PORT), "test");
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
 
-    @Override
-    String password() {
-        return "";
+        String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(IMAGE)
+                .networkAliases(HOSTNAME)
+                .containerEnv(containerEnv)
+                .driverClass(OCEANBASE_DRIVER_CLASS)
+                .host(HOST)
+                .port(PORT)
+                .localPort(dbServer.getMappedPort(PORT))
+                .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+                .jdbcUrl(jdbcUrl)
+                .userName(USERNAME)
+                .password(PASSWORD)
+                .database(OCEANBASE_DATABASE)
+                .sourceTable(OCEANBASE_SOURCE)
+                .sinkTable(OCEANBASE_SINK)
+                .catalogDatabase(OCEANBASE_CATALOG_DATABASE)
+                .catalogTable(OCEANBASE_CATALOG_TABLE)
+                .createSql(createSqlTemplate())
+                .configFile(configFile())
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
     }
 
     @Override
-    List<String> configFile() {
-        return 
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
+    protected void createSchemaIfNeeded() {
+        String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+        try {
+            connection.prepareStatement(sql).executeUpdate();
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
+        }
     }
 
     @Override
@@ -239,18 +273,30 @@ public class JdbcOceanBaseMysqlIT extends 
JdbcOceanBaseITBase {
     }
 
     @Override
-    GenericContainer<?> initContainer() {
-        GenericContainer<?> container =
-                new GenericContainer<>(imageName())
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(host())
-                        .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
-                        .withStartupTimeout(Duration.ofMinutes(5))
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName())));
+    String getFullTableName(String tableName) {
+        return buildTableInfoWithSchema(OCEANBASE_DATABASE, tableName);
+    }
 
-        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
port(), port())));
+    @Override
+    GenericContainer<?> initContainer() {
+        return new GenericContainer<>(IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOSTNAME)
+                .withExposedPorts(PORT)
+                .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+                .withStartupTimeout(Duration.ofMinutes(3))
+                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
+    }
 
-        return container;
+    @Override
+    protected void initCatalog() {
+        catalog =
+                new OceanBaseMySqlCatalog(
+                        "oceanbase",
+                        USERNAME,
+                        PASSWORD,
+                        JdbcUrlUtil.getUrlInfo(
+                                jdbcCase.getJdbcUrl().replace(HOST, 
dbServer.getHost())));
+        catalog.open();
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
index 4c3cca5ddc..f28c26b35f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
@@ -17,76 +17,139 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseOracleCatalog;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.GenericContainer;
 
 import com.google.common.collect.Lists;
 
 import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.sql.Date;
+import java.sql.Driver;
+import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.given;
+import java.util.Map;
+import java.util.Properties;
 
 @Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker 
environment")
 public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase {
 
-    @Override
-    String imageName() {
-        return null;
-    }
+    private static final String HOSTNAME = "e2e_oceanbase_oracle";
+    private static final int PORT = 2883;
+    private static final String USERNAME = "TESTUSER@test";
+    private static final String PASSWORD = "";
+    private static final String SCHEMA = "TESTUSER";
 
     @Override
-    String host() {
-        return "e2e_oceanbase_oracle";
+    List<String> configFile() {
+        return 
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
     }
 
     @Override
-    int port() {
-        return 2883;
+    GenericContainer<?> initContainer() {
+        throw new UnsupportedOperationException();
     }
 
+    @BeforeAll
     @Override
-    String username() {
-        return "root";
-    }
+    public void startUp() {
+        jdbcCase = getJdbcCase();
 
-    @Override
-    String password() {
-        return "";
+        try {
+            initConnection();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to initial jdbc connection", e);
+        }
+
+        createNeededTables();
+        insertTestData();
+        initCatalog();
     }
 
     @Override
-    List<String> configFile() {
-        return 
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
+    public void tearDown() throws SQLException {
+        if (connection != null) {
+            connection
+                    .createStatement()
+                    .execute("DROP TABLE " + 
getFullTableName(OCEANBASE_SOURCE));
+            connection.createStatement().execute("DROP TABLE " + 
getFullTableName(OCEANBASE_SINK));
+        }
+        super.tearDown();
     }
 
     @Override
-    GenericContainer<?> initContainer() {
-        throw new UnsupportedOperationException();
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, PORT, SCHEMA);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(SCHEMA, OCEANBASE_SOURCE.toUpperCase(), 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(null)
+                .networkAliases(HOSTNAME)
+                .containerEnv(containerEnv)
+                .driverClass(OCEANBASE_DRIVER_CLASS)
+                .host(HOST)
+                .port(PORT)
+                .localPort(PORT)
+                .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+                .jdbcUrl(jdbcUrl)
+                .userName(USERNAME)
+                .password(PASSWORD)
+                .schema(SCHEMA)
+                .sourceTable(OCEANBASE_SOURCE.toUpperCase())
+                .sinkTable(OCEANBASE_SINK.toUpperCase())
+                .catalogSchema(SCHEMA)
+                .catalogTable(OCEANBASE_CATALOG_TABLE)
+                .createSql(createSqlTemplate())
+                .configFile(configFile())
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
     }
 
-    @Override
-    public void startUp() {
-        jdbcCase = getJdbcCase();
+    private void initConnection()
+            throws SQLException, ClassNotFoundException, MalformedURLException,
+                    InstantiationException, IllegalAccessException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(driverUrl())},
+                        JdbcOceanBaseOracleIT.class.getClassLoader());
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        Driver driver = (Driver) 
urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance();
+        Properties props = new Properties();
 
-        given().ignoreExceptions()
-                .await()
-                .atMost(360, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+        if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+            props.put("user", jdbcCase.getUserName());
+        }
 
-        createSchemaIfNeeded();
-        createNeededTables();
-        insertTestData();
+        if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+            props.put("password", jdbcCase.getPassword());
+        }
+
+        connection = driver.connect(jdbcCase.getJdbcUrl().replace(HOST, 
HOSTNAME), props);
+        connection.setAutoCommit(false);
     }
 
     @Override
@@ -108,8 +171,7 @@ public class JdbcOceanBaseOracleIT extends 
JdbcOceanBaseITBase {
                 + "    BINARY_FLOAT_COL              binary_float,\n"
                 + "    BINARY_DOUBLE_COL             binary_double,\n"
                 + "    DATE_COL                      date,\n"
-                + "    TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n"
-                + "    TIMESTAMP_WITH_LOCAL_TZ       timestamp with local time 
zone\n"
+                + "    TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3)\n"
                 + ")";
     }
 
@@ -126,8 +188,7 @@ public class JdbcOceanBaseOracleIT extends 
JdbcOceanBaseITBase {
             "BINARY_FLOAT_COL",
             "BINARY_DOUBLE_COL",
             "DATE_COL",
-            "TIMESTAMP_WITH_3_FRAC_SEC_COL",
-            "TIMESTAMP_WITH_LOCAL_TZ"
+            "TIMESTAMP_WITH_3_FRAC_SEC_COL"
         };
     }
 
@@ -150,7 +211,6 @@ public class JdbcOceanBaseOracleIT extends 
JdbcOceanBaseITBase {
                                 Float.parseFloat("22.2"),
                                 Double.parseDouble("2.2"),
                                 Date.valueOf(LocalDate.now()),
-                                Timestamp.valueOf(LocalDateTime.now()),
                                 Timestamp.valueOf(LocalDateTime.now())
                             });
             rows.add(row);
@@ -158,4 +218,51 @@ public class JdbcOceanBaseOracleIT extends 
JdbcOceanBaseITBase {
 
         return Pair.of(fieldNames, rows);
     }
+
+    @Override
+    String getFullTableName(String tableName) {
+        return buildTableInfoWithSchema(SCHEMA, tableName.toUpperCase());
+    }
+
+    @Override
+    protected void clearTable(String database, String schema, String table) {
+        clearTable(schema, table);
+    }
+
+    @Override
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(schema, table);
+    }
+
+    @Override
+    protected void initCatalog() {
+        catalog =
+                new OceanBaseOracleCatalog(
+                        "oceanbase",
+                        USERNAME,
+                        PASSWORD,
+                        
JdbcUrlUtil.getUrlInfo(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME)),
+                        SCHEMA);
+        catalog.open();
+    }
+
+    @Test
+    @Override
+    public void testCatalog() {
+        TablePath sourceTablePath =
+                new TablePath(
+                        jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSourceTable());
+        TablePath targetTablePath =
+                new TablePath(
+                        jdbcCase.getCatalogDatabase(),
+                        jdbcCase.getCatalogSchema(),
+                        jdbcCase.getCatalogTable());
+
+        CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+        catalog.createTable(targetTablePath, catalogTable, false);
+        Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+        catalog.dropTable(targetTablePath, false);
+        Assertions.assertFalse(catalog.tableExists(targetTablePath));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
index 098d3ffae2..3025ca7934 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
@@ -31,7 +31,7 @@ source {
   Jdbc {
     driver = com.oceanbase.jdbc.Driver
     url = 
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
-    user = root
+    user = "root@test"
     password = ""
     query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, 
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, 
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, 
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, 
c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, 
c_varbinary, c_binary, c_yea [...]
     compatible_mode = "mysql"
@@ -45,7 +45,7 @@ sink {
   Jdbc {
     driver = com.oceanbase.jdbc.Driver
     url = 
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
-    user = root
+    user = "root@test"
     password = ""
     query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, 
c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, 
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, 
c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, 
c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, 
c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, 
c_mediumblob, c_blob, c_longblob, c_varbinary, c_bin [...]
     compatible_mode = "mysql"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
index bf2b1ccf06..226a325275 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
@@ -29,11 +29,11 @@ env {
 source {
   jdbc{
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER"
     driver = com.oceanbase.jdbc.Driver
-    user = "root"
+    user = "TESTUSER@test"
     password = ""
-    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ
 FROM source"
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL
 FROM SOURCE"
     compatible_mode = "oracle"
   }
 }
@@ -43,11 +43,11 @@ transform {
 
 sink {
   jdbc{
-    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER"
     driver = com.oceanbase.jdbc.Driver
-    user = "root"
+    user = "TESTUSER@test"
     password = ""
-    query = "INSERT INTO sink 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?)"
     compatible_mode = "oracle"
   }
 }


Reply via email to