He Wang created FLINK-32714:
-------------------------------
Summary: JDBC: Add dialect for OceanBase database
Key: FLINK-32714
URL: https://issues.apache.org/jira/browse/FLINK-32714
Project: Flink
Issue Type: New Feature
Reporter: He Wang
OceanBase is a distributed relational database, the community edition of
OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.]
The enterprise edition of OceanBase is compatible with MySql and Oracle, which
means we can reuse almost all the dialect rules.
The difference from other databases is that we must provide the compatibility
mode firstly, then the connector can determine which dialect to use, so a
startup option like 'compatible-mode' is needed.
A dialect implementation for OceanBase is like below:
{code:java}
package org.apache.flink.connector.jdbc.databases.oceanbase;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nonnull;
import java.util.Optional;
import java.util.Set;
/** JDBC dialect for OceanBase. */
public class OceanBaseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private final AbstractDialect dialect;
public OceanBaseDialect(@Nonnull String compatibleMode) {
switch (compatibleMode.toLowerCase()) {
case "mysql":
this.dialect = new MySqlDialect();
break;
case "oracle":
this.dialect = new OracleDialect();
break;
default:
throw new IllegalArgumentException(
"Unsupported compatible mode: " + compatibleMode);
}
}
@Override
public String dialectName() {
return "OceanBase";
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.oceanbase.jdbc.Driver");
}
@Override
public Set<LogicalTypeRoot> supportedTypes() {
return dialect.supportedTypes();
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return dialect.getRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return dialect.getLimitClause(limit);
}
@Override
public String quoteIdentifier(String identifier) {
return dialect.quoteIdentifier(identifier);
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] conditionFields) {
return dialect.getUpsertStatement(tableName, fieldNames,
conditionFields);
}
@Override
public Optional<Range> timestampPrecisionRange() {
return dialect.timestampPrecisionRange();
}
@Override
public Optional<Range> decimalPrecisionRange() {
return dialect.decimalPrecisionRange();
}
@Override
public String appendDefaultUrlProperties(String url) {
return dialect.appendDefaultUrlProperties(url);
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)