[
https://issues.apache.org/jira/browse/FLINK-32714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-32714:
-----------------------------------
Labels: auto-deprioritized-major pull-request-available (was:
pull-request-available stale-major)
Priority: Minor (was: Major)
This issue was labeled "stale-major" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Major, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> JDBC: Add dialect for OceanBase database
> ----------------------------------------
>
> Key: FLINK-32714
> URL: https://issues.apache.org/jira/browse/FLINK-32714
> Project: Flink
> Issue Type: New Feature
> Reporter: He Wang
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> OceanBase is a distributed relational database, the community edition of
> OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.]
> The enterprise edition of OceanBase is compatible with MySql and Oracle,
> which means we can reuse almost all the dialect rules.
> The difference from other databases is that we must provide the compatibility
> mode firstly, then the connector can determine which dialect to use, so a
> startup option like 'compatible-mode' is needed.
> A dialect implementation for OceanBase is like below:
> {code:java}
> package org.apache.flink.connector.jdbc.databases.oceanbase;
> import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
> import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
> import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
> import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
> import org.apache.flink.table.types.logical.LogicalTypeRoot;
> import org.apache.flink.table.types.logical.RowType;
> import javax.annotation.Nonnull;
> import java.util.Optional;
> import java.util.Set;
> /** JDBC dialect for OceanBase. */
> public class OceanBaseDialect extends AbstractDialect {
> private static final long serialVersionUID = 1L;
> private final AbstractDialect dialect;
> public OceanBaseDialect(@Nonnull String compatibleMode) {
> switch (compatibleMode.toLowerCase()) {
> case "mysql":
> this.dialect = new MySqlDialect();
> break;
> case "oracle":
> this.dialect = new OracleDialect();
> break;
> default:
> throw new IllegalArgumentException(
> "Unsupported compatible mode: " + compatibleMode);
> }
> }
> @Override
> public String dialectName() {
> return "OceanBase";
> }
> @Override
> public Optional<String> defaultDriverName() {
> return Optional.of("com.oceanbase.jdbc.Driver");
> }
> @Override
> public Set<LogicalTypeRoot> supportedTypes() {
> return dialect.supportedTypes();
> }
> @Override
> public JdbcRowConverter getRowConverter(RowType rowType) {
> return dialect.getRowConverter(rowType);
> }
> @Override
> public String getLimitClause(long limit) {
> return dialect.getLimitClause(limit);
> }
> @Override
> public String quoteIdentifier(String identifier) {
> return dialect.quoteIdentifier(identifier);
> }
> @Override
> public Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] conditionFields) {
> return dialect.getUpsertStatement(tableName, fieldNames,
> conditionFields);
> }
> @Override
> public Optional<Range> timestampPrecisionRange() {
> return dialect.timestampPrecisionRange();
> }
> @Override
> public Optional<Range> decimalPrecisionRange() {
> return dialect.decimalPrecisionRange();
> }
> @Override
> public String appendDefaultUrlProperties(String url) {
> return dialect.appendDefaultUrlProperties(url);
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)