RocMarshal commented on code in PR #109:
URL:
https://github.com/apache/flink-connector-jdbc/pull/109#discussion_r1666467123
##########
docs/content/docs/connectors/table/jdbc.md:
##########
@@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```
+### JDBC Catalog for OceanBase
+
+#### OceanBase Metaspace Mapping
+
+OceanBase database supports multiple tenant management, and each tenant can
work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of
OceanBase, there are databases and tables but no schema in one tenant, these
objects just like databases and tables in the MySQL database. On Oracle mode of
OceanBase, there are schemas and tables but no database in one tenant, these
objects just like schemas and tables in the Oracle database.
+
+In Flink, when querying tables registered by OceanBase Catalog, users can use
either `database.table_name` or just `table_name` on OceanBase MySQL mode, or
use either `schema.table_name` or just `table_name` on OceanBase Oracle mode.
+
+Therefore, the metaspace mapping between Flink Catalog and OceanBase is as
following:
+
+| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A
| N/A |
+| database name | database name
| schema name |
+| table name | table name
| table name |
+
+The full path of OceanBase table in Flink should be "`<catalog>`.`<db or
schema>`.`<table>`".
+
+Here are some examples to access OceanBase tables:
+
+```sql
+-- scan table 'test_table', the default database or schema is 'mydb'.
+SELECT * FROM mysql_catalog.mydb.test_table;
Review Comment:
```suggestion
SELECT * FROM oceanbase_catalog.mydb.test_table;
```
##########
docs/content.zh/docs/connectors/table/jdbc.md:
##########
@@ -657,6 +658,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```
+
+<a name="jdbc-catalog-for-oceanbase"></a>
+
+### JDBC Catalog for OceanBase
+
+<a name="oceanbase-metaspace-mapping"></a>
+
+#### OceanBase 元空间映射
+
+OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL
模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle
模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。
+
+在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用
`database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用
`schema.table_name` 或只使用 `table_name`。
+
+因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下:
+
+| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A
| N/A |
+| database name | database name
| schema name |
+| table name | table name
| table name |
+
+Flink 中的 OceanBase 表的完整路径应该是 ``"`<catalog>`.`<db or schema>`.`<table>`"``。
+
+这里提供了一些访问 OceanBase 表的例子:
+
+```sql
+-- 扫描 默认数据库 'mydb' 中的 'test_table' 表
+SELECT * FROM mysql_catalog.mydb.test_table;
Review Comment:
```suggestion
SELECT * FROM oceanbase_catalog.mydb.test_table;
```
##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java:
##########
@@ -32,11 +32,12 @@
import java.util.Arrays;
import java.util.List;
+import static
org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase.tableRow;
import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
/** The Table Source ITCase for OceanBase Oracle mode. */
-@Disabled
+@Disabled("OceanBase Oracle mode can only be tested locally.")
Review Comment:
👍
##########
docs/content.zh/docs/connectors/table/jdbc.md:
##########
@@ -657,6 +658,42 @@ SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```
+
+<a name="jdbc-catalog-for-oceanbase"></a>
+
+### JDBC Catalog for OceanBase
+
+<a name="oceanbase-metaspace-mapping"></a>
+
+#### OceanBase 元空间映射
+
+OceanBase 数据库支持多租户管理,每个租户可以工作在 MySQL 兼容模式或 Oracle 兼容模式。在 OceanBase 的 MySQL
模式上,一个租户中有数据库和表,就像 MySQL 数据库中的数据库和表一样,但没有 schema。在 OceanBase 的 Oracle
模式下,一个租户中有 schema 和表,就像 Oracle 数据库中的 schema 和表一样,但没有数据库。
+
+在 Flink 中,查询 OceanBase Catalog 注册的表时,OceanBase MySQL 模式下可以使用
`database.table_name` 或只使用 `table_name`,OceanBase Oracle 模式下可以使用
`schema.table_name` 或只使用 `table_name`。
+
+因此,Flink Catalog 和 OceanBase catalog 之间的元空间映射如下:
+
+| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A
| N/A |
+| database name | database name
| schema name |
+| table name | table name
| table name |
+
+Flink 中的 OceanBase 表的完整路径应该是 ``"`<catalog>`.`<db or schema>`.`<table>`"``。
+
+这里提供了一些访问 OceanBase 表的例子:
+
+```sql
+-- 扫描 默认数据库 'mydb' 中的 'test_table' 表
+SELECT * FROM mysql_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- 扫描 'given_database' 数据库中的 'test_table2' 表,
+SELECT * FROM mysql_catalog.given_database.test_table2;
Review Comment:
```suggestion
SELECT * FROM oceanbase_catalog.given_database.test_table2;
```
##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java:
##########
@@ -82,6 +84,14 @@ public static AbstractJdbcCatalog createCatalog(
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, baseUrl,
connectionProperties);
+ } else if (dialect instanceof OceanBaseDialect) {
Review Comment:
How about move the creation of the xxxCatalog into the `JdbcDialect`, If so,
The code block `if... else ...` would be overwrited by `polymorphic` of java ?
--> JdbcDialect.createCatalog(....)
##########
docs/content/docs/connectors/table/jdbc.md:
##########
@@ -654,6 +655,37 @@ SELECT * FROM crate.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
```
+### JDBC Catalog for OceanBase
+
+#### OceanBase Metaspace Mapping
+
+OceanBase database supports multiple tenant management, and each tenant can
work at MySQL compatible mode or Oracle compatible mode. On MySQL mode of
OceanBase, there are databases and tables but no schema in one tenant, these
objects just like databases and tables in the MySQL database. On Oracle mode of
OceanBase, there are schemas and tables but no database in one tenant, these
objects just like schemas and tables in the Oracle database.
+
+In Flink, when querying tables registered by OceanBase Catalog, users can use
either `database.table_name` or just `table_name` on OceanBase MySQL mode, or
use either `schema.table_name` or just `table_name` on OceanBase Oracle mode.
+
+Therefore, the metaspace mapping between Flink Catalog and OceanBase is as
following:
+
+| Flink Catalog Metaspace Structure | OceanBase Metaspace Structure (MySQL
Mode) | OceanBase Metaspace Structure (Oracle Mode) |
+|:-------------------------------------|:-------------------------------------------|---------------------------------------------|
+| catalog name (defined in Flink only) | N/A
| N/A |
+| database name | database name
| schema name |
+| table name | table name
| table name |
+
+The full path of OceanBase table in Flink should be "`<catalog>`.`<db or
schema>`.`<table>`".
+
+Here are some examples to access OceanBase tables:
+
+```sql
+-- scan table 'test_table', the default database or schema is 'mydb'.
+SELECT * FROM mysql_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- scan table 'test_table' with the given database or schema.
+SELECT * FROM mysql_catalog.given_database.test_table2;
Review Comment:
```suggestion
SELECT * FROM oceanbase_catalog.given_database.test_table2;
```
##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseTypeMapper.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+
+/** OceanBaseTypeMapper util class. */
+@Internal
+public class OceanBaseTypeMapper implements JdbcDialectTypeMapper {
+
+ private static final int RAW_TIME_LENGTH = 10;
+ private static final int RAW_TIMESTAMP_LENGTH = 19;
+
+ private static final int TYPE_BINARY_FLOAT = 100;
+ private static final int TYPE_BINARY_DOUBLE = 101;
+
+ private final String compatibleMode;
+
+ public OceanBaseTypeMapper(String compatibleMode) {
+ this.compatibleMode = compatibleMode;
+ }
+
+ @Override
+ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata,
int colIndex)
+ throws SQLException {
+ String typeName = metadata.getColumnTypeName(colIndex).toUpperCase();
+ int jdbcType = metadata.getColumnType(colIndex);
+ String columnName = metadata.getColumnName(colIndex);
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ switch (jdbcType) {
+ case Types.BIT:
+ return DataTypes.BOOLEAN();
+ case Types.TINYINT:
+ if (isUnsignedType(typeName) || precision > 4) {
Review Comment:
How about convert the lines like it to `return condition ? valueA : valueB;`
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]