This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cd4b7ff7d2 [Feature][Connector-V2][Jdbc] Add OceanBase catalog (#5439)
cd4b7ff7d2 is described below
commit cd4b7ff7d2d10c930cb2c14cef0bf1e6d282c49a
Author: He Wang <[email protected]>
AuthorDate: Mon Oct 16 19:08:40 2023 +0800
[Feature][Connector-V2][Jdbc] Add OceanBase catalog (#5439)
* feat: add oceanbase jdbc catalog
* remove oceanbase-client and disable it cases
* update EXCLUDE_SCHEMAS and skip database check in OceanBase Oracle mode
---
.../seatunnel/jdbc/catalog/JdbcCatalogOptions.java | 8 +
.../catalog/oceanbase/OceanBaseCatalogFactory.java | 83 ++++++++++
.../catalog/oceanbase/OceanBaseMySqlCatalog.java | 39 +++++
.../catalog/oceanbase/OceanBaseOracleCatalog.java | 99 ++++++++++++
.../jdbc/catalog/oracle/OracleCatalog.java | 2 +-
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 3 +-
.../seatunnel/jdbc/JdbcOceanBaseITBase.java | 73 +--------
.../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 104 ++++++++----
.../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 179 ++++++++++++++++-----
.../jdbc_oceanbase_mysql_source_and_sink.conf | 4 +-
.../jdbc_oceanbase_oracle_source_and_sink.conf | 12 +-
11 files changed, 466 insertions(+), 140 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
index 712eefacb8..fc58a45c28 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -50,6 +50,14 @@ public interface JdbcCatalogOptions {
.withDescription(
"for databases that support the schema parameter,
give it priority.");
+ Option<String> COMPATIBLE_MODE =
+ Options.key("compatibleMode")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The compatible mode of database, required when
the database supports multiple compatible modes. "
+ + "For example, when using OceanBase
database, you need to set it to 'mysql' or 'oracle'.");
+
OptionRule.Builder BASE_RULE =
OptionRule.builder().required(BASE_URL).required(USERNAME,
PASSWORD).optional(SCHEMA);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
new file mode 100644
index 0000000000..aa8b016d08
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class OceanBaseCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "OceanBase";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(urlWithDatabase),
+ "Miss config <base-url>! Please check your config.");
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
+
+ String compatibleMode =
options.get(JdbcCatalogOptions.COMPATIBLE_MODE);
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(compatibleMode),
+ "Miss config <compatibleMode>! Please check your config.");
+
+ if ("oracle".equalsIgnoreCase(compatibleMode.trim())) {
+ return new OceanBaseOracleCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo,
+ options.get(JdbcCatalogOptions.SCHEMA));
+ }
+ return new OceanBaseMySqlCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo);
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return
JdbcCatalogOptions.BASE_RULE.required(JdbcCatalogOptions.COMPATIBLE_MODE).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
new file mode 100644
index 0000000000..58cdb5c413
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
+
+public class OceanBaseMySqlCatalog extends MySqlCatalog {
+
+ static {
+ SYS_DATABASES.clear();
+ SYS_DATABASES.add("information_schema");
+ SYS_DATABASES.add("mysql");
+ SYS_DATABASES.add("oceanbase");
+ SYS_DATABASES.add("LBACSYS");
+ SYS_DATABASES.add("ORAAUDITOR");
+ SYS_DATABASES.add("SYS");
+ }
+
+ public OceanBaseMySqlCatalog(
+ String catalogName, String username, String pwd,
JdbcUrlUtil.UrlInfo urlInfo) {
+ super(catalogName, username, pwd, urlInfo);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
new file mode 100644
index 0000000000..b4ece7db9c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class OceanBaseOracleCatalog extends OracleCatalog {
+
+ static {
+ EXCLUDED_SCHEMAS =
+ Collections.unmodifiableList(
+ Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR",
"SYS"));
+ }
+
+ public OceanBaseOracleCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ JdbcUrlUtil.UrlInfo urlInfo,
+ String defaultSchema) {
+ super(catalogName, username, pwd, urlInfo, defaultSchema);
+ }
+
+ @Override
+ protected String getListDatabaseSql() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ String dbUrl = getUrlFromDatabaseName(databaseName);
+ try {
+ return queryString(dbUrl, getListTableSql(databaseName),
this::getTableName);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ return
listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ if (defaultSchema.isPresent()) {
+ tablePath =
+ new TablePath(
+ tablePath.getDatabaseName(),
+ defaultSchema.get(),
+ tablePath.getTableName());
+ }
+
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+
+ createTableInternal(tablePath, table);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index 570f387411..f72cf2a75d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -54,7 +54,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR =
new OracleDataTypeConvertor();
- private static final List<String> EXCLUDED_SCHEMAS =
+ protected static List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList(
"APPQOSSYS",
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 2738f5ccc3..3798c64c17 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -229,10 +229,11 @@ public abstract class AbstractJdbcIT extends
TestSuiteBase implements TestResour
@Override
public void startUp() {
dbServer =
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
- jdbcCase = getJdbcCase();
Startables.deepStart(Stream.of(dbServer)).join();
+ jdbcCase = getJdbcCase();
+
given().ignoreExceptions()
.await()
.atMost(360, TimeUnit.SECONDS)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index b8202e697a..50177ef1a8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -17,11 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
import org.junit.jupiter.api.Assertions;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
@@ -29,29 +24,18 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.Statement;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT {
- private static final String OCEANBASE_DATABASE = "seatunnel";
- private static final String OCEANBASE_SOURCE = "source";
- private static final String OCEANBASE_SINK = "sink";
-
- private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://"
+ HOST + ":%s";
- private static final String OCEANBASE_DRIVER_CLASS =
"com.oceanbase.jdbc.Driver";
-
- abstract String imageName();
+ protected static final String OCEANBASE_SOURCE = "source";
+ protected static final String OCEANBASE_SINK = "sink";
- abstract String host();
+ protected static final String OCEANBASE_CATALOG_TABLE = "catalog_table";
- abstract int port();
-
- abstract String username();
-
- abstract String password();
+ protected static final String OCEANBASE_JDBC_TEMPLATE =
"jdbc:oceanbase://" + HOST + ":%s/%s";
+ protected static final String OCEANBASE_DRIVER_CLASS =
"com.oceanbase.jdbc.Driver";
abstract List<String> configFile();
@@ -59,44 +43,14 @@ public abstract class JdbcOceanBaseITBase extends
AbstractJdbcIT {
abstract String[] getFieldNames();
- @Override
- JdbcCase getJdbcCase() {
- Map<String, String> containerEnv = new HashMap<>();
- String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port());
- Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
- String[] fieldNames = testDataSet.getKey();
-
- String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE,
fieldNames);
-
- return JdbcCase.builder()
- .dockerImage(imageName())
- .networkAliases(host())
- .containerEnv(containerEnv)
- .driverClass(OCEANBASE_DRIVER_CLASS)
- .host(HOST)
- .port(port())
- .localPort(port())
- .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
- .jdbcUrl(jdbcUrl)
- .userName(username())
- .password(password())
- .database(OCEANBASE_DATABASE)
- .sourceTable(OCEANBASE_SOURCE)
- .sinkTable(OCEANBASE_SINK)
- .createSql(createSqlTemplate())
- .configFile(configFile())
- .insertSql(insertSql)
- .testData(testDataSet)
- .build();
- }
+ abstract String getFullTableName(String tableName);
@Override
void compareResult() {
String sourceSql =
- String.format(
- "select * from %s.%s order by 1", OCEANBASE_DATABASE,
OCEANBASE_SOURCE);
+ String.format("select * from %s order by 1",
getFullTableName(OCEANBASE_SOURCE));
String sinkSql =
- String.format("select * from %s.%s order by 1",
OCEANBASE_DATABASE, OCEANBASE_SINK);
+ String.format("select * from %s order by 1",
getFullTableName(OCEANBASE_SINK));
try {
Statement sourceStatement = connection.createStatement();
Statement sinkStatement = connection.createStatement();
@@ -133,15 +87,4 @@ public abstract class JdbcOceanBaseITBase extends
AbstractJdbcIT {
String driverUrl() {
return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";
}
-
- @Override
- protected void createSchemaIfNeeded() {
- String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
- try {
- connection.prepareStatement(sql).executeUpdate();
- } catch (Exception e) {
- throw new SeaTunnelRuntimeException(
- JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql
" + sql, e);
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
index 548fecaee6..b4e0ff1b48 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -18,6 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseMySqlCatalog;
import org.apache.commons.lang3.tuple.Pair;
@@ -36,39 +39,70 @@ import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Disabled("Disabled due to insufficient hardware resources in the CI
environment")
public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase {
- @Override
- String imageName() {
- return "oceanbase/oceanbase-ce:4.0.0.0";
- }
+ private static final String IMAGE = "oceanbase/oceanbase-ce:4.1.0.0";
- @Override
- String host() {
- return "e2e_oceanbase_mysql";
- }
+ private static final String HOSTNAME = "e2e_oceanbase_mysql";
+ private static final int PORT = 2881;
+ private static final String USERNAME = "root@test";
+ private static final String PASSWORD = "";
+ private static final String OCEANBASE_DATABASE = "seatunnel";
+ private static final String OCEANBASE_CATALOG_DATABASE =
"seatunnel_catalog";
@Override
- int port() {
- return 2881;
+ List<String> configFile() {
+ return
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
}
@Override
- String username() {
- return "root";
- }
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl =
+ String.format(OCEANBASE_JDBC_TEMPLATE,
dbServer.getMappedPort(PORT), "test");
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
- @Override
- String password() {
- return "";
+ String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE,
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(IMAGE)
+ .networkAliases(HOSTNAME)
+ .containerEnv(containerEnv)
+ .driverClass(OCEANBASE_DRIVER_CLASS)
+ .host(HOST)
+ .port(PORT)
+ .localPort(dbServer.getMappedPort(PORT))
+ .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .password(PASSWORD)
+ .database(OCEANBASE_DATABASE)
+ .sourceTable(OCEANBASE_SOURCE)
+ .sinkTable(OCEANBASE_SINK)
+ .catalogDatabase(OCEANBASE_CATALOG_DATABASE)
+ .catalogTable(OCEANBASE_CATALOG_TABLE)
+ .createSql(createSqlTemplate())
+ .configFile(configFile())
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
}
@Override
- List<String> configFile() {
- return
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
+ protected void createSchemaIfNeeded() {
+ String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+ try {
+ connection.prepareStatement(sql).executeUpdate();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(
+ JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql
" + sql, e);
+ }
}
@Override
@@ -239,18 +273,30 @@ public class JdbcOceanBaseMysqlIT extends
JdbcOceanBaseITBase {
}
@Override
- GenericContainer<?> initContainer() {
- GenericContainer<?> container =
- new GenericContainer<>(imageName())
- .withNetwork(NETWORK)
- .withNetworkAliases(host())
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(5))
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName())));
+ String getFullTableName(String tableName) {
+ return buildTableInfoWithSchema(OCEANBASE_DATABASE, tableName);
+ }
- container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
port(), port())));
+ @Override
+ GenericContainer<?> initContainer() {
+ return new GenericContainer<>(IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOSTNAME)
+ .withExposedPorts(PORT)
+ .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+ .withStartupTimeout(Duration.ofMinutes(3))
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
+ }
- return container;
+ @Override
+ protected void initCatalog() {
+ catalog =
+ new OceanBaseMySqlCatalog(
+ "oceanbase",
+ USERNAME,
+ PASSWORD,
+ JdbcUrlUtil.getUrlInfo(
+ jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost())));
+ catalog.open();
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
index 4c3cca5ddc..f28c26b35f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
@@ -17,76 +17,139 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseOracleCatalog;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import com.google.common.collect.Lists;
import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.sql.Date;
+import java.sql.Driver;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.given;
+import java.util.Map;
+import java.util.Properties;
@Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker
environment")
public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase {
- @Override
- String imageName() {
- return null;
- }
+ private static final String HOSTNAME = "e2e_oceanbase_oracle";
+ private static final int PORT = 2883;
+ private static final String USERNAME = "TESTUSER@test";
+ private static final String PASSWORD = "";
+ private static final String SCHEMA = "TESTUSER";
@Override
- String host() {
- return "e2e_oceanbase_oracle";
+ List<String> configFile() {
+ return
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
}
@Override
- int port() {
- return 2883;
+ GenericContainer<?> initContainer() {
+ throw new UnsupportedOperationException();
}
+ @BeforeAll
@Override
- String username() {
- return "root";
- }
+ public void startUp() {
+ jdbcCase = getJdbcCase();
- @Override
- String password() {
- return "";
+ try {
+ initConnection();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initial jdbc connection", e);
+ }
+
+ createNeededTables();
+ insertTestData();
+ initCatalog();
}
@Override
- List<String> configFile() {
- return
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
+ public void tearDown() throws SQLException {
+ if (connection != null) {
+ connection
+ .createStatement()
+ .execute("DROP TABLE " +
getFullTableName(OCEANBASE_SOURCE));
+ connection.createStatement().execute("DROP TABLE " +
getFullTableName(OCEANBASE_SINK));
+ }
+ super.tearDown();
}
@Override
- GenericContainer<?> initContainer() {
- throw new UnsupportedOperationException();
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, PORT, SCHEMA);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(SCHEMA, OCEANBASE_SOURCE.toUpperCase(),
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(null)
+ .networkAliases(HOSTNAME)
+ .containerEnv(containerEnv)
+ .driverClass(OCEANBASE_DRIVER_CLASS)
+ .host(HOST)
+ .port(PORT)
+ .localPort(PORT)
+ .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .password(PASSWORD)
+ .schema(SCHEMA)
+ .sourceTable(OCEANBASE_SOURCE.toUpperCase())
+ .sinkTable(OCEANBASE_SINK.toUpperCase())
+ .catalogSchema(SCHEMA)
+ .catalogTable(OCEANBASE_CATALOG_TABLE)
+ .createSql(createSqlTemplate())
+ .configFile(configFile())
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
}
- @Override
- public void startUp() {
- jdbcCase = getJdbcCase();
+ private void initConnection()
+ throws SQLException, ClassNotFoundException, MalformedURLException,
+ InstantiationException, IllegalAccessException {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(driverUrl())},
+ JdbcOceanBaseOracleIT.class.getClassLoader());
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ Driver driver = (Driver)
urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance();
+ Properties props = new Properties();
- given().ignoreExceptions()
- .await()
- .atMost(360, TimeUnit.SECONDS)
- .untilAsserted(() ->
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+ if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+ props.put("user", jdbcCase.getUserName());
+ }
- createSchemaIfNeeded();
- createNeededTables();
- insertTestData();
+ if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+ props.put("password", jdbcCase.getPassword());
+ }
+
+ connection = driver.connect(jdbcCase.getJdbcUrl().replace(HOST,
HOSTNAME), props);
+ connection.setAutoCommit(false);
}
@Override
@@ -108,8 +171,7 @@ public class JdbcOceanBaseOracleIT extends
JdbcOceanBaseITBase {
+ " BINARY_FLOAT_COL binary_float,\n"
+ " BINARY_DOUBLE_COL binary_double,\n"
+ " DATE_COL date,\n"
- + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n"
- + " TIMESTAMP_WITH_LOCAL_TZ timestamp with local time
zone\n"
+ + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3)\n"
+ ")";
}
@@ -126,8 +188,7 @@ public class JdbcOceanBaseOracleIT extends
JdbcOceanBaseITBase {
"BINARY_FLOAT_COL",
"BINARY_DOUBLE_COL",
"DATE_COL",
- "TIMESTAMP_WITH_3_FRAC_SEC_COL",
- "TIMESTAMP_WITH_LOCAL_TZ"
+ "TIMESTAMP_WITH_3_FRAC_SEC_COL"
};
}
@@ -150,7 +211,6 @@ public class JdbcOceanBaseOracleIT extends
JdbcOceanBaseITBase {
Float.parseFloat("22.2"),
Double.parseDouble("2.2"),
Date.valueOf(LocalDate.now()),
- Timestamp.valueOf(LocalDateTime.now()),
Timestamp.valueOf(LocalDateTime.now())
});
rows.add(row);
@@ -158,4 +218,51 @@ public class JdbcOceanBaseOracleIT extends
JdbcOceanBaseITBase {
return Pair.of(fieldNames, rows);
}
+
+ @Override
+ String getFullTableName(String tableName) {
+ return buildTableInfoWithSchema(SCHEMA, tableName.toUpperCase());
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ protected void initCatalog() {
+ catalog =
+ new OceanBaseOracleCatalog(
+ "oceanbase",
+ USERNAME,
+ PASSWORD,
+
JdbcUrlUtil.getUrlInfo(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME)),
+ SCHEMA);
+ catalog.open();
+ }
+
+ @Test
+ @Override
+ public void testCatalog() {
+ TablePath sourceTablePath =
+ new TablePath(
+ jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSourceTable());
+ TablePath targetTablePath =
+ new TablePath(
+ jdbcCase.getCatalogDatabase(),
+ jdbcCase.getCatalogSchema(),
+ jdbcCase.getCatalogTable());
+
+ CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+ catalog.createTable(targetTablePath, catalogTable, false);
+ Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+ catalog.dropTable(targetTablePath, false);
+ Assertions.assertFalse(catalog.tableExists(targetTablePath));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
index 098d3ffae2..3025ca7934 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
@@ -31,7 +31,7 @@ source {
Jdbc {
driver = com.oceanbase.jdbc.Driver
url =
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
- user = root
+ user = "root@test"
password = ""
query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean,
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint,
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal,
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext,
c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob,
c_varbinary, c_binary, c_yea [...]
compatible_mode = "mysql"
@@ -45,7 +45,7 @@ sink {
Jdbc {
driver = com.oceanbase.jdbc.Driver
url =
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
- user = root
+ user = "root@test"
password = ""
query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64,
c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint,
c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned,
c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text,
c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob,
c_mediumblob, c_blob, c_longblob, c_varbinary, c_bin [...]
compatible_mode = "mysql"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
index bf2b1ccf06..226a325275 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
@@ -29,11 +29,11 @@ env {
source {
jdbc{
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+ url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER"
driver = com.oceanbase.jdbc.Driver
- user = "root"
+ user = "TESTUSER@test"
password = ""
- query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ
FROM source"
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL
FROM SOURCE"
compatible_mode = "oracle"
}
}
@@ -43,11 +43,11 @@ transform {
sink {
jdbc{
- url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+ url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER"
driver = com.oceanbase.jdbc.Driver
- user = "root"
+ user = "TESTUSER@test"
password = ""
- query = "INSERT INTO sink
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)"
+ query = "INSERT INTO SINK
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?)"
compatible_mode = "oracle"
}
}