[
https://issues.apache.org/jira/browse/FLINK-32714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-32714:
-----------------------------------
Labels: pull-request-available stale-major (was: pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> JDBC: Add dialect for OceanBase database
> ----------------------------------------
>
> Key: FLINK-32714
> URL: https://issues.apache.org/jira/browse/FLINK-32714
> Project: Flink
> Issue Type: New Feature
> Reporter: He Wang
> Priority: Major
> Labels: pull-request-available, stale-major
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> OceanBase is a distributed relational database, the community edition of
> OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.]
> The enterprise edition of OceanBase is compatible with MySql and Oracle,
> which means we can reuse almost all the dialect rules.
> The difference from other databases is that we must provide the compatibility
> mode firstly, then the connector can determine which dialect to use, so a
> startup option like 'compatible-mode' is needed.
> A dialect implementation for OceanBase is like below:
> {code:java}
> package org.apache.flink.connector.jdbc.databases.oceanbase;
> import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
> import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
> import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
> import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
> import org.apache.flink.table.types.logical.LogicalTypeRoot;
> import org.apache.flink.table.types.logical.RowType;
> import javax.annotation.Nonnull;
> import java.util.Optional;
> import java.util.Set;
> /** JDBC dialect for OceanBase. */
> public class OceanBaseDialect extends AbstractDialect {
> private static final long serialVersionUID = 1L;
> private final AbstractDialect dialect;
> public OceanBaseDialect(@Nonnull String compatibleMode) {
> switch (compatibleMode.toLowerCase()) {
> case "mysql":
> this.dialect = new MySqlDialect();
> break;
> case "oracle":
> this.dialect = new OracleDialect();
> break;
> default:
> throw new IllegalArgumentException(
> "Unsupported compatible mode: " + compatibleMode);
> }
> }
> @Override
> public String dialectName() {
> return "OceanBase";
> }
> @Override
> public Optional<String> defaultDriverName() {
> return Optional.of("com.oceanbase.jdbc.Driver");
> }
> @Override
> public Set<LogicalTypeRoot> supportedTypes() {
> return dialect.supportedTypes();
> }
> @Override
> public JdbcRowConverter getRowConverter(RowType rowType) {
> return dialect.getRowConverter(rowType);
> }
> @Override
> public String getLimitClause(long limit) {
> return dialect.getLimitClause(limit);
> }
> @Override
> public String quoteIdentifier(String identifier) {
> return dialect.quoteIdentifier(identifier);
> }
> @Override
> public Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] conditionFields) {
> return dialect.getUpsertStatement(tableName, fieldNames,
> conditionFields);
> }
> @Override
> public Optional<Range> timestampPrecisionRange() {
> return dialect.timestampPrecisionRange();
> }
> @Override
> public Optional<Range> decimalPrecisionRange() {
> return dialect.decimalPrecisionRange();
> }
> @Override
> public String appendDefaultUrlProperties(String url) {
> return dialect.appendDefaultUrlProperties(url);
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)