This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8b8e8e88de7 [refactor](jdbc catalog) split jdbc executor for different
data sources (step-1) (#31406)
8b8e8e88de7 is described below
commit 8b8e8e88de79f817eeaae3c8c3e0c81a8eba688b
Author: zy-kkk <[email protected]>
AuthorDate: Wed Feb 28 22:24:18 2024 +0800
[refactor](jdbc catalog) split jdbc executor for different data sources
(step-1) (#31406)
---
be/src/vec/exec/vjdbc_connector.cpp | 35 +-
be/src/vec/exec/vjdbc_connector.h | 3 +
.../docker-compose/mysql/init/04-insert.sql | 4 +-
.../org/apache/doris/jdbc/BaseJdbcExecutor.java | 520 ++++++++++
...{JdbcExecutor.java => DefaultJdbcExecutor.java} | 6 +-
.../java/org/apache/doris/jdbc/JdbcExecutor.java | 1085 +-------------------
.../org/apache/doris/jdbc/JdbcExecutorFactory.java | 33 +
.../org/apache/doris/jdbc/MySQLJdbcExecutor.java | 291 ++++++
.../org/apache/doris/jdbc/OracleJdbcExecutor.java | 131 +++
.../jdbc/test_mysql_jdbc_catalog.out | 2 +
10 files changed, 1028 insertions(+), 1082 deletions(-)
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index e6419ec95e7..4020278e84d 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -49,7 +49,7 @@
#include "vec/io/reader_buffer.h"
namespace doris::vectorized {
-const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/jdbc/JdbcExecutor";
+const char* JDBC_EXECUTOR_FACTORY_CLASS =
"org/apache/doris/jdbc/JdbcExecutorFactory";
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
@@ -85,6 +85,7 @@ Status JdbcConnector::close(Status /*unused*/) {
}
JNIEnv* env;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+ env->DeleteGlobalRef(_executor_factory_clazz);
env->DeleteGlobalRef(_executor_clazz);
DELETE_BASIC_JAVA_CLAZZ_REF(object)
DELETE_BASIC_JAVA_CLAZZ_REF(string)
@@ -104,7 +105,29 @@ Status JdbcConnector::open(RuntimeState* state, bool read)
{
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
- RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_CLASS,
&_executor_clazz));
+ RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env,
JDBC_EXECUTOR_FACTORY_CLASS,
+ &_executor_factory_clazz));
+
+ _executor_factory_ctor_id =
+ env->GetStaticMethodID(_executor_factory_clazz, "getExecutorClass",
+
"(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;");
+ if (_executor_factory_ctor_id == nullptr) {
+ return Status::InternalError("Failed to find method ID for
getExecutorClass");
+ }
+
+ jobject jtable_type = _get_java_table_type(env, _conn_param.table_type);
+
+ jstring executor_name = (jstring)env->CallStaticObjectMethod(
+ _executor_factory_clazz, _executor_factory_ctor_id, jtable_type);
+ if (executor_name == nullptr) {
+ return Status::InternalError("getExecutorClass returned null");
+ }
+ const char* executor_name_str = env->GetStringUTFChars(executor_name,
nullptr);
+
+ RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, executor_name_str,
&_executor_clazz));
+ env->DeleteLocalRef(jtable_type);
+ env->ReleaseStringUTFChars(executor_name, executor_name_str);
+ env->DeleteLocalRef(executor_name);
GET_BASIC_JAVA_CLAZZ("java/util/List", list)
GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
GET_BASIC_JAVA_CLAZZ("java/lang/String", string)
@@ -747,4 +770,12 @@ Status JdbcConnector::_cast_string_to_json(const
SlotDescriptor* slot_desc, Bloc
return Status::OK();
}
+jobject JdbcConnector::_get_java_table_type(JNIEnv* env, TOdbcTableType::type
tableType) {
+ jclass enumClass =
env->FindClass("org/apache/doris/thrift/TOdbcTableType");
+ jmethodID findByValueMethod = env->GetStaticMethodID(
+ enumClass, "findByValue",
"(I)Lorg/apache/doris/thrift/TOdbcTableType;");
+ jobject javaEnumObj =
+ env->CallStaticObjectMethod(enumClass, findByValueMethod,
static_cast<jint>(tableType));
+ return javaEnumObj;
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 2ecdf210fac..ed2afdecdfd 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -131,13 +131,16 @@ private:
int rows);
Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block,
int column_index,
int rows);
+ jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType);
bool _closed = false;
+ jclass _executor_factory_clazz;
jclass _executor_clazz;
jclass _executor_list_clazz;
jclass _executor_object_clazz;
jclass _executor_string_clazz;
jobject _executor_obj;
+ jmethodID _executor_factory_ctor_id;
jmethodID _executor_ctor_id;
jmethodID _executor_stmt_write_id;
jmethodID _executor_read_id;
diff --git a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
index bb1b0962b45..d71986b1354 100644
--- a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
@@ -1151,7 +1151,7 @@ VALUES ('2023-06-17 10:00:00', '2023-06-17 10:00:01.1',
'2023-06-17 10:00:02.22'
SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'STRICT_TRANS_TABLES',''));
INSERT INTO doris_test.dt_null
-VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00');
+VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00'),('0000-01-01 00:00:00');
insert into doris_test.test_key_word values (1, 1), (2, 2);
@@ -1159,7 +1159,7 @@ insert into doris_test.test_key_word values (1, 1), (2,
2);
SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_DATE',''));
SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_IN_DATE',''));
-insert into doris_test.test_zd (id,d_z) VALUES
(1,'0000-00-00'),(2,'2022-01-01');
+insert into doris_test.test_zd (id,d_z) VALUES
(1,'0000-00-00'),(2,'2022-01-01'),(3,'0000-01-01');
insert into Doris.DORIS values ('DORIS');
insert into Doris.Doris values ('Doris');
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
new file mode 100644
index 00000000000..87f67807826
--- /dev/null
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.exception.InternalException;
+import org.apache.doris.common.exception.UdfRuntimeException;
+import org.apache.doris.common.jni.utils.UdfUtils;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorColumn;
+import org.apache.doris.common.jni.vec.VectorTable;
+import org.apache.doris.thrift.TJdbcExecutorCtorParams;
+import org.apache.doris.thrift.TJdbcOperation;
+import org.apache.doris.thrift.TOdbcTableType;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.google.common.base.Preconditions;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.net.MalformedURLException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public abstract class BaseJdbcExecutor implements JdbcExecutor {
+ private static final Logger LOG = Logger.getLogger(BaseJdbcExecutor.class);
+ private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
+ private DruidDataSource druidDataSource = null;
+ private final byte[] druidDataSourceLock = new byte[0];
+ private TOdbcTableType tableType;
+ private JdbcDataSourceConfig config;
+ private Connection conn = null;
+ protected PreparedStatement preparedStatement = null;
+ protected Statement stmt = null;
+ protected ResultSet resultSet = null;
+ protected ResultSetMetaData resultSetMetaData = null;
+ protected List<String> resultColumnTypeNames = null;
+ protected List<Object[]> block = null;
+ protected VectorTable outputTable = null;
+ protected int batchSizeNum = 0;
+ protected int curBlockRows = 0;
+
+ public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
+ TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
+ TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
+ try {
+ deserializer.deserialize(request, thriftParams);
+ } catch (TException e) {
+ throw new InternalException(e.getMessage());
+ }
+ tableType = request.table_type;
+ this.config = new JdbcDataSourceConfig()
+ .setCatalogId(request.catalog_id)
+ .setJdbcUser(request.jdbc_user)
+ .setJdbcPassword(request.jdbc_password)
+ .setJdbcUrl(request.jdbc_url)
+ .setJdbcDriverUrl(request.driver_path)
+ .setJdbcDriverClass(request.jdbc_driver_class)
+ .setBatchSize(request.batch_size)
+ .setOp(request.op)
+ .setTableType(request.table_type)
+ .setConnectionPoolMinSize(request.connection_pool_min_size)
+ .setConnectionPoolMaxSize(request.connection_pool_max_size)
+
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
+
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
+
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
+
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
+ init(config, request.statement);
+ }
+
+ public void close() throws Exception {
+ try {
+ if (stmt != null) {
+ try {
+ stmt.cancel();
+ } catch (SQLException e) {
+ LOG.error("Error cancelling statement", e);
+ }
+ }
+
+ boolean shouldAbort = conn != null && resultSet != null;
+ boolean aborted = false; // Used to record whether the abort
operation is performed
+ if (shouldAbort) {
+ aborted = abortReadConnection(conn, resultSet);
+ }
+
+ // If no abort operation is performed, the resource needs to be
closed manually
+ if (!aborted) {
+ closeResources(resultSet, stmt, conn);
+ }
+ } finally {
+ if (config.getConnectionPoolMinSize() == 0 && druidDataSource !=
null) {
+ druidDataSource.close();
+
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
+ druidDataSource = null;
+ }
+ }
+ }
+
+ private void closeResources(AutoCloseable... closeables) {
+ for (AutoCloseable closeable : closeables) {
+ if (closeable != null) {
+ try {
+ if (closeable instanceof Connection) {
+ if (!((Connection) closeable).isClosed()) {
+ closeable.close();
+ }
+ } else {
+ closeable.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot close resource: ", e);
+ }
+ }
+ }
+ }
+
+ protected boolean abortReadConnection(Connection connection, ResultSet
resultSet)
+ throws SQLException {
+ return false;
+ }
+
+ public int read() throws UdfRuntimeException {
+ try {
+ resultSet = ((PreparedStatement) stmt).executeQuery();
+ resultSetMetaData = resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ resultColumnTypeNames = new ArrayList<>(columnCount);
+ block = new ArrayList<>(columnCount);
+ for (int i = 0; i < columnCount; ++i) {
+
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+ }
+ return columnCount;
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+ }
+ }
+
+ public long getBlockAddress(int batchSize, Map<String, String>
outputParams) throws UdfRuntimeException {
+ try {
+ if (outputTable != null) {
+ outputTable.close();
+ }
+
+ outputTable = VectorTable.createWritableTable(outputParams, 0);
+
+ String isNullableString = outputParams.get("is_nullable");
+ String replaceString = outputParams.get("replace_string");
+
+ if (isNullableString == null || replaceString == null) {
+ throw new IllegalArgumentException(
+ "Output parameters 'is_nullable' and 'replace_string'
are required.");
+ }
+
+ String[] nullableList = isNullableString.split(",");
+ String[] replaceStringList = replaceString.split(",");
+ curBlockRows = 0;
+ int columnCount = resultSetMetaData.getColumnCount();
+
+ initializeBlock(columnCount, replaceStringList, batchSize,
outputTable);
+
+ do {
+ for (int i = 0; i < columnCount; ++i) {
+ ColumnType type = outputTable.getColumnType(i);
+ block.get(i)[curBlockRows] = getColumnValue(i, type,
replaceStringList);
+ }
+ curBlockRows++;
+ } while (curBlockRows < batchSize && resultSet.next());
+
+ for (int i = 0; i < columnCount; ++i) {
+ ColumnType type = outputTable.getColumnType(i);
+ Object[] columnData = block.get(i);
+ Class<?> componentType =
columnData.getClass().getComponentType();
+ Object[] newColumn = (Object[])
Array.newInstance(componentType, curBlockRows);
+ System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
+ boolean isNullable = Boolean.parseBoolean(nullableList[i]);
+ outputTable.appendData(i, newColumn, getOutputConverter(type,
replaceStringList[i]), isNullable);
+ }
+ } catch (Exception e) {
+ LOG.warn("jdbc get block address exception: ", e);
+ throw new UdfRuntimeException("jdbc get block address: ", e);
+ }
+ return outputTable.getMetaAddress();
+ }
+
+ protected void initializeBlock(int columnCount, String[]
replaceStringList, int batchSizeNum,
+ VectorTable outputTable) {
+ for (int i = 0; i < columnCount; ++i) {
+
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+ }
+ }
+
+ public int write(Map<String, String> params) throws UdfRuntimeException {
+ VectorTable batchTable = VectorTable.createReadableTable(params);
+ // Can't release or close batchTable, it's released by c++
+ try {
+ insert(batchTable);
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+ }
+ return batchTable.getNumRows();
+ }
+
+ public void openTrans() throws UdfRuntimeException {
+ try {
+ if (conn != null) {
+ conn.setAutoCommit(false);
+ }
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor open transaction has
error: ", e);
+ }
+ }
+
+ public void commitTrans() throws UdfRuntimeException {
+ try {
+ if (conn != null) {
+ conn.commit();
+ }
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor commit transaction
has error: ", e);
+ }
+ }
+
+ public void rollbackTrans() throws UdfRuntimeException {
+ try {
+ if (conn != null) {
+ conn.rollback();
+ }
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor rollback transaction
has error: ", e);
+ }
+ }
+
+ public List<String> getResultColumnTypeNames() {
+ return resultColumnTypeNames;
+ }
+
+ public int getCurBlockRows() {
+ return curBlockRows;
+ }
+
+ public boolean hasNext() throws UdfRuntimeException {
+ try {
+ if (resultSet == null) {
+ return false;
+ }
+ return resultSet.next();
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("resultSet to get next error: ", e);
+ }
+ }
+
+ private void init(JdbcDataSourceConfig config, String sql) throws
UdfRuntimeException {
+ String druidDataSourceKey = config.createCacheKey();
+ try {
+ ClassLoader parent = getClass().getClassLoader();
+ ClassLoader classLoader =
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
+ druidDataSource =
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
+ if (druidDataSource == null) {
+ synchronized (druidDataSourceLock) {
+ druidDataSource =
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
+ if (druidDataSource == null) {
+ long start = System.currentTimeMillis();
+ DruidDataSource ds = new DruidDataSource();
+ ds.setDriverClassLoader(classLoader);
+ ds.setDriverClassName(config.getJdbcDriverClass());
+ ds.setUrl(config.getJdbcUrl());
+ ds.setUsername(config.getJdbcUser());
+ ds.setPassword(config.getJdbcPassword());
+ ds.setMinIdle(config.getConnectionPoolMinSize()); //
default 1
+ ds.setInitialSize(config.getConnectionPoolMinSize());
// default 1
+ ds.setMaxActive(config.getConnectionPoolMaxSize()); //
default 10
+ ds.setMaxWait(config.getConnectionPoolMaxWaitTime());
// default 5000
+ ds.setTestWhileIdle(true);
+ ds.setTestOnBorrow(false);
+ setValidationQuery(ds);
+ // default 3 min
+
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() /
10L);
+ // default 15 min
+
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
+ // default 30 min
+
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
+ ds.setKeepAlive(config.isConnectionPoolKeepAlive());
+ // default 6 min
+
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
+ druidDataSource = ds;
+
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
+ LOG.info("JdbcClient set"
+ + " ConnectionPoolMinSize = " +
config.getConnectionPoolMinSize()
+ + ", ConnectionPoolMaxSize = " +
config.getConnectionPoolMaxSize()
+ + ", ConnectionPoolMaxWaitTime = " +
config.getConnectionPoolMaxWaitTime()
+ + ", ConnectionPoolMaxLifeTime = " +
config.getConnectionPoolMaxLifeTime()
+ + ", ConnectionPoolKeepAlive = " +
config.isConnectionPoolKeepAlive());
+ LOG.info("init datasource [" + (config.getJdbcUrl() +
config.getJdbcUser()) + "] cost: " + (
+ System.currentTimeMillis() - start) + " ms");
+ }
+ }
+ }
+
+ long start = System.currentTimeMillis();
+ conn = druidDataSource.getConnection();
+ LOG.info("get connection [" + (config.getJdbcUrl() +
config.getJdbcUser()) + "] cost: " + (
+ System.currentTimeMillis() - start)
+ + " ms");
+
+ initializeStatement(conn, config, sql);
+
+ } catch (MalformedURLException e) {
+ throw new UdfRuntimeException("MalformedURLException to load class
about " + config.getJdbcDriverUrl(), e);
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("Initialize datasource failed: ", e);
+ } catch (FileNotFoundException e) {
+ throw new UdfRuntimeException("FileNotFoundException failed: ", e);
+ } catch (Exception e) {
+ throw new UdfRuntimeException("Initialize datasource failed: ", e);
+ }
+ }
+
+ protected void setValidationQuery(DruidDataSource ds) {
+ ds.setValidationQuery("SELECT 1");
+ }
+
+ protected void initializeStatement(Connection conn, JdbcDataSourceConfig
config, String sql) throws SQLException {
+ if (config.getOp() == TJdbcOperation.READ) {
+ conn.setAutoCommit(false);
+ Preconditions.checkArgument(sql != null, "SQL statement cannot be
null for READ operation.");
+ stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(config.getBatchSize()); // set fetch size to
batch size
+ batchSizeNum = config.getBatchSize();
+ } else {
+ Preconditions.checkArgument(sql != null, "SQL statement cannot be
null for WRITE operation.");
+ LOG.info("Insert SQL: " + sql);
+ preparedStatement = conn.prepareStatement(sql);
+ }
+ }
+
+ protected abstract Object getColumnValue(int columnIndex, ColumnType type,
String[] replaceStringList)
+ throws SQLException;
+
+ /*
+ | Type | Java Array Type
|
+
|---------------------------------------------|----------------------------|
+ | BOOLEAN | Boolean[]
|
+ | TINYINT | Byte[]
|
+ | SMALLINT | Short[]
|
+ | INT | Integer[]
|
+ | BIGINT | Long[]
|
+ | LARGEINT | BigInteger[]
|
+ | FLOAT | Float[]
|
+ | DOUBLE | Double[]
|
+ | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[]
|
+ | DATE, DATEV2 | LocalDate[]
|
+ | DATETIME, DATETIMEV2 | LocalDateTime[]
|
+ | CHAR, VARCHAR, STRING | String[]
|
+ | ARRAY | List<Object>[]
|
+ | MAP | Map<Object, Object>[]
|
+ | STRUCT | Map<String, Object>[]
|
+ */
+
+ protected abstract ColumnValueConverter getOutputConverter(ColumnType
columnType, String replaceString);
+
+ protected ColumnValueConverter createConverter(
+ Function<Object, ?> converterFunction, Class<?> type) {
+ return (Object[] columnData) -> {
+ Object[] result = (Object[]) Array.newInstance(type,
columnData.length);
+ for (int i = 0; i < columnData.length; i++) {
+ result[i] = columnData[i] != null ?
converterFunction.apply(columnData[i]) : null;
+ }
+ return result;
+ };
+ }
+
+ private int insert(VectorTable data) throws SQLException {
+ for (int i = 0; i < data.getNumRows(); ++i) {
+ for (int j = 0; j < data.getColumns().length; ++j) {
+ insertColumn(i, j, data.getColumns()[j]);
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+ return data.getNumRows();
+ }
+
+ private void insertColumn(int rowIdx, int colIdx, VectorColumn column)
throws SQLException {
+ int parameterIndex = colIdx + 1;
+ ColumnType.Type dorisType = column.getColumnTyp();
+ if (column.isNullAt(rowIdx)) {
+ insertNullColumn(parameterIndex, dorisType);
+ return;
+ }
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setBoolean(parameterIndex,
column.getBoolean(rowIdx));
+ break;
+ case TINYINT:
+ preparedStatement.setByte(parameterIndex,
column.getByte(rowIdx));
+ break;
+ case SMALLINT:
+ preparedStatement.setShort(parameterIndex,
column.getShort(rowIdx));
+ break;
+ case INT:
+ preparedStatement.setInt(parameterIndex,
column.getInt(rowIdx));
+ break;
+ case BIGINT:
+ preparedStatement.setLong(parameterIndex,
column.getLong(rowIdx));
+ break;
+ case LARGEINT:
+ preparedStatement.setObject(parameterIndex,
column.getBigInteger(rowIdx));
+ break;
+ case FLOAT:
+ preparedStatement.setFloat(parameterIndex,
column.getFloat(rowIdx));
+ break;
+ case DOUBLE:
+ preparedStatement.setDouble(parameterIndex,
column.getDouble(rowIdx));
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setBigDecimal(parameterIndex,
column.getDecimal(rowIdx));
+ break;
+ case DATEV2:
+ preparedStatement.setDate(parameterIndex,
Date.valueOf(column.getDate(rowIdx)));
+ break;
+ case DATETIMEV2:
+ preparedStatement.setTimestamp(
+ parameterIndex,
Timestamp.valueOf(column.getDateTime(rowIdx)));
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case BINARY:
+ preparedStatement.setString(parameterIndex,
column.getStringWithOffset(rowIdx));
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+
+ private void insertNullColumn(int parameterIndex, ColumnType.Type
dorisType)
+ throws SQLException {
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+ break;
+ case TINYINT:
+ preparedStatement.setNull(parameterIndex, Types.TINYINT);
+ break;
+ case SMALLINT:
+ preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+ break;
+ case INT:
+ preparedStatement.setNull(parameterIndex, Types.INTEGER);
+ break;
+ case BIGINT:
+ preparedStatement.setNull(parameterIndex, Types.BIGINT);
+ break;
+ case LARGEINT:
+ preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
+ break;
+ case FLOAT:
+ preparedStatement.setNull(parameterIndex, Types.FLOAT);
+ break;
+ case DOUBLE:
+ preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+ break;
+ case DATEV2:
+ preparedStatement.setNull(parameterIndex, Types.DATE);
+ break;
+ case DATETIMEV2:
+ preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case BINARY:
+ preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
similarity index 99%
copy from
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
copy to
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
index 3f00643d126..94d4304db38 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
@@ -78,8 +78,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class JdbcExecutor {
- private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
+public class DefaultJdbcExecutor {
+ private static final Logger LOG =
Logger.getLogger(DefaultJdbcExecutor.class);
private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
private Connection conn = null;
private PreparedStatement preparedStatement = null;
@@ -97,7 +97,7 @@ public class JdbcExecutor {
private TOdbcTableType tableType;
private JdbcDataSourceConfig config;
- public JdbcExecutor(byte[] thriftParams) throws Exception {
+ public DefaultJdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
try {
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
index 3f00643d126..85c40e21d16 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
@@ -17,1091 +17,26 @@
package org.apache.doris.jdbc;
-import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
-import org.apache.doris.common.jni.utils.UdfUtils;
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ColumnValueConverter;
-import org.apache.doris.common.jni.vec.VectorColumn;
-import org.apache.doris.common.jni.vec.VectorTable;
-import org.apache.doris.thrift.TJdbcExecutorCtorParams;
-import org.apache.doris.thrift.TJdbcOperation;
-import org.apache.doris.thrift.TOdbcTableType;
-import com.alibaba.druid.pool.DruidDataSource;
-import com.clickhouse.data.value.UnsignedByte;
-import com.clickhouse.data.value.UnsignedInteger;
-import com.clickhouse.data.value.UnsignedLong;
-import com.clickhouse.data.value.UnsignedShort;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.vesoft.nebula.client.graph.data.ValueWrapper;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-import java.io.FileNotFoundException;
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.Inet4Address;
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public class JdbcExecutor {
- private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
- private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
- private Connection conn = null;
- private PreparedStatement preparedStatement = null;
- private Statement stmt = null;
- private ResultSet resultSet = null;
- private ResultSetMetaData resultSetMetaData = null;
- private List<String> resultColumnTypeNames = null;
- private List<Object[]> block = null;
- private VectorTable outputTable = null;
- private int batchSizeNum = 0;
- private int curBlockRows = 0;
- private static final byte[] emptyBytes = new byte[0];
- private DruidDataSource druidDataSource = null;
- private final byte[] druidDataSourceLock = new byte[0];
- private TOdbcTableType tableType;
- private JdbcDataSourceConfig config;
-
- public JdbcExecutor(byte[] thriftParams) throws Exception {
- TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
- TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
- try {
- deserializer.deserialize(request, thriftParams);
- } catch (TException e) {
- throw new InternalException(e.getMessage());
- }
- tableType = request.table_type;
- this.config = new JdbcDataSourceConfig()
- .setCatalogId(request.catalog_id)
- .setJdbcUser(request.jdbc_user)
- .setJdbcPassword(request.jdbc_password)
- .setJdbcUrl(request.jdbc_url)
- .setJdbcDriverUrl(request.driver_path)
- .setJdbcDriverClass(request.jdbc_driver_class)
- .setBatchSize(request.batch_size)
- .setOp(request.op)
- .setTableType(request.table_type)
- .setConnectionPoolMinSize(request.connection_pool_min_size)
- .setConnectionPoolMaxSize(request.connection_pool_max_size)
-
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
-
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
-
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
-
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
- init(config, request.statement);
- }
-
- public void close() throws Exception {
- try {
- if (stmt != null) {
- try {
- stmt.cancel();
- } catch (SQLException e) {
- LOG.error("Error cancelling statement", e);
- }
- }
-
- boolean shouldAbort = conn != null && resultSet != null
- && (tableType == TOdbcTableType.MYSQL || tableType ==
TOdbcTableType.SQLSERVER);
- boolean aborted = false; // Used to record whether the abort
operation is performed
- if (shouldAbort) {
- aborted = abortReadConnection(conn, resultSet, tableType);
- }
-
- // If no abort operation is performed, the resource needs to be
closed manually
- if (!aborted) {
- closeResources(resultSet, stmt, conn);
- }
- } finally {
- if (config.getConnectionPoolMinSize() == 0 && druidDataSource !=
null) {
- druidDataSource.close();
-
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
- druidDataSource = null;
- }
- }
- }
-
- private void closeResources(AutoCloseable... closeables) {
- for (AutoCloseable closeable : closeables) {
- if (closeable != null) {
- try {
- if (closeable instanceof Connection) {
- if (!((Connection) closeable).isClosed()) {
- closeable.close();
- }
- } else {
- closeable.close();
- }
- } catch (Exception e) {
- LOG.error("Cannot close resource: ", e);
- }
- }
- }
- }
-
- public boolean abortReadConnection(Connection connection, ResultSet
resultSet, TOdbcTableType tableType)
- throws SQLException {
- if (!resultSet.isAfterLast() && (tableType == TOdbcTableType.MYSQL ||
tableType == TOdbcTableType.SQLSERVER)) {
- // Abort connection before closing. Without this, the
MySQL/SQLServer driver
- // attempts to drain the connection by reading all the results.
- connection.abort(MoreExecutors.directExecutor());
- return true;
- }
- return false;
- }
-
- public int read() throws UdfRuntimeException {
- try {
- resultSet = ((PreparedStatement) stmt).executeQuery();
- resultSetMetaData = resultSet.getMetaData();
- int columnCount = resultSetMetaData.getColumnCount();
- resultColumnTypeNames = new ArrayList<>(columnCount);
- block = new ArrayList<>(columnCount);
- if (isNebula()) {
- for (int i = 0; i < columnCount; ++i) {
- block.add((Object[]) Array.newInstance(Object.class,
batchSizeNum));
- }
- } else {
- for (int i = 0; i < columnCount; ++i) {
-
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
- block.add((Object[]) Array.newInstance(Object.class,
batchSizeNum));
- }
- }
- return columnCount;
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor sql has error: ", e);
- }
- }
-
- public long getBlockAddress(int batchSize, Map<String, String>
outputParams)
- throws UdfRuntimeException {
- try {
- if (outputTable != null) {
- outputTable.close();
- }
-
- String isNullableString = outputParams.get("is_nullable");
- String replaceString = outputParams.get("replace_string");
-
- if (isNullableString == null || replaceString == null) {
- throw new IllegalArgumentException(
- "Output parameters 'is_nullable' and 'replace_string'
are required.");
- }
-
- String[] nullableList = isNullableString.split(",");
- String[] replaceStringList = replaceString.split(",");
- curBlockRows = 0;
- int columnCount = resultSetMetaData.getColumnCount();
-
- do {
- for (int i = 0; i < columnCount; ++i) {
- boolean isBitmapOrHll =
- replaceStringList[i].equals("bitmap")
- || replaceStringList[i].equals("hll");
- block.get(i)[curBlockRows] = getColumnValue(tableType, i,
isBitmapOrHll);
- }
- curBlockRows++;
- } while (curBlockRows < batchSize && resultSet.next());
-
- outputTable = VectorTable.createWritableTable(outputParams,
curBlockRows);
-
- for (int i = 0; i < columnCount; ++i) {
- Object[] columnData = block.get(i);
- ColumnType type = outputTable.getColumnType(i);
- Class<?> clz = findNonNullClass(columnData, type);
- Object[] newColumn = (Object[]) Array.newInstance(clz,
curBlockRows);
- System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
- boolean isNullable = Boolean.parseBoolean(nullableList[i]);
- outputTable.appendData(
- i,
- newColumn,
- getOutputConverter(type, clz, replaceStringList[i]),
- isNullable);
- }
- } catch (Exception e) {
- LOG.warn("jdbc get block address exception: ", e);
- throw new UdfRuntimeException("jdbc get block address: ", e);
- }
- return outputTable.getMetaAddress();
- }
-
- public int write(Map<String, String> params) throws UdfRuntimeException {
- VectorTable batchTable = VectorTable.createReadableTable(params);
- // Can't release or close batchTable, it's released by c++
- try {
- insert(batchTable);
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor sql has error: ", e);
- }
- return batchTable.getNumRows();
- }
-
- public void openTrans() throws UdfRuntimeException {
- try {
- if (conn != null) {
- conn.setAutoCommit(false);
- }
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor open transaction has
error: ", e);
- }
- }
-
- public void commitTrans() throws UdfRuntimeException {
- try {
- if (conn != null) {
- conn.commit();
- }
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor commit transaction
has error: ", e);
- }
- }
-
- public void rollbackTrans() throws UdfRuntimeException {
- try {
- if (conn != null) {
- conn.rollback();
- }
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor rollback transaction
has error: ", e);
- }
- }
-
- public List<String> getResultColumnTypeNames() {
- return resultColumnTypeNames;
- }
-
- public int getCurBlockRows() {
- return curBlockRows;
- }
-
- public boolean hasNext() throws UdfRuntimeException {
- try {
- if (resultSet == null) {
- return false;
- }
- return resultSet.next();
- } catch (SQLException e) {
- throw new UdfRuntimeException("resultSet to get next error: ", e);
- }
- }
-
- private void init(JdbcDataSourceConfig config, String sql) throws
UdfRuntimeException {
- String druidDataSourceKey = config.createCacheKey();
- try {
- if (isNebula()) {
- batchSizeNum = config.getBatchSize();
- Class.forName(config.getJdbcDriverClass());
- conn =
DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(),
- config.getJdbcPassword());
- stmt = conn.prepareStatement(sql);
- } else {
- ClassLoader parent = getClass().getClassLoader();
- ClassLoader classLoader =
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
- druidDataSource =
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
- if (druidDataSource == null) {
- synchronized (druidDataSourceLock) {
- druidDataSource =
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
- if (druidDataSource == null) {
- long start = System.currentTimeMillis();
- DruidDataSource ds = new DruidDataSource();
- ds.setDriverClassLoader(classLoader);
- ds.setDriverClassName(config.getJdbcDriverClass());
- ds.setUrl(config.getJdbcUrl());
- ds.setUsername(config.getJdbcUser());
- ds.setPassword(config.getJdbcPassword());
- ds.setMinIdle(config.getConnectionPoolMinSize());
// default 1
-
ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1
-
ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10
-
ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000
- ds.setTestWhileIdle(true);
- ds.setTestOnBorrow(false);
- setValidationQuery(ds, config.getTableType());
- // default 3 min
-
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() /
10L);
- // default 15 min
-
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
- // default 30 min
-
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
-
ds.setKeepAlive(config.isConnectionPoolKeepAlive());
- // default 6 min
-
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
- druidDataSource = ds;
-
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
- LOG.info("JdbcClient set"
- + " ConnectionPoolMinSize = " +
config.getConnectionPoolMinSize()
- + ", ConnectionPoolMaxSize = " +
config.getConnectionPoolMaxSize()
- + ", ConnectionPoolMaxWaitTime = " +
config.getConnectionPoolMaxWaitTime()
- + ", ConnectionPoolMaxLifeTime = " +
config.getConnectionPoolMaxLifeTime()
- + ", ConnectionPoolKeepAlive = " +
config.isConnectionPoolKeepAlive());
- LOG.info("init datasource [" +
(config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
- System.currentTimeMillis() - start) + "
ms");
- }
- }
- }
-
- long start = System.currentTimeMillis();
- conn = druidDataSource.getConnection();
- LOG.info("get connection [" + (config.getJdbcUrl() +
config.getJdbcUser()) + "] cost: " + (
- System.currentTimeMillis() - start)
- + " ms");
- if (config.getOp() == TJdbcOperation.READ) {
- conn.setAutoCommit(false);
- Preconditions.checkArgument(sql != null);
- stmt = conn.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- if (tableType == TOdbcTableType.MYSQL) {
- stmt.setFetchSize(Integer.MIN_VALUE);
- } else {
- stmt.setFetchSize(config.getBatchSize());
- }
- batchSizeNum = config.getBatchSize();
- } else {
- LOG.info("insert sql: " + sql);
- preparedStatement = conn.prepareStatement(sql);
- }
- }
- } catch (MalformedURLException e) {
- throw new UdfRuntimeException("MalformedURLException to load class
about " + config.getJdbcDriverUrl(), e);
- } catch (SQLException e) {
- throw new UdfRuntimeException("Initialize datasource failed: ", e);
- } catch (FileNotFoundException e) {
- throw new UdfRuntimeException("FileNotFoundException failed: ", e);
- } catch (Exception e) {
- throw new UdfRuntimeException("Initialize datasource failed: ", e);
- }
- }
-
- private void setValidationQuery(DruidDataSource ds, TOdbcTableType
tableType) {
- if (tableType == TOdbcTableType.ORACLE || tableType ==
TOdbcTableType.OCEANBASE_ORACLE) {
- ds.setValidationQuery("SELECT 1 FROM dual");
- } else if (tableType == TOdbcTableType.SAP_HANA) {
- ds.setValidationQuery("SELECT 1 FROM DUMMY");
- } else {
- ds.setValidationQuery("SELECT 1");
- }
- }
-
- public boolean isNebula() {
- return tableType == TOdbcTableType.NEBULA;
- }
-
- private Class<?> findNonNullClass(Object[] columnData, ColumnType type) {
- for (Object data : columnData) {
- if (data != null) {
- return data.getClass();
- }
- }
- switch (type.getType()) {
- case BOOLEAN:
- return Boolean.class;
- case TINYINT:
- return Byte.class;
- case SMALLINT:
- return Short.class;
- case INT:
- return Integer.class;
- case BIGINT:
- return Long.class;
- case LARGEINT:
- return BigInteger.class;
- case FLOAT:
- return Float.class;
- case DOUBLE:
- return Double.class;
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- return BigDecimal.class;
- case DATE:
- case DATEV2:
- return LocalDate.class;
- case DATETIME:
- case DATETIMEV2:
- return LocalDateTime.class;
- case CHAR:
- case VARCHAR:
- case STRING:
- return String.class;
- case ARRAY:
- return List.class;
- default:
- throw new IllegalArgumentException(
- "Unsupported column type: " + type.getType());
- }
- }
-
- public Object getColumnValue(TOdbcTableType tableType, int columnIndex,
boolean isBitmapOrHll)
- throws SQLException {
- Object result;
- if (tableType == TOdbcTableType.NEBULA) {
- result = UdfUtils.convertObject((ValueWrapper)
resultSet.getObject(columnIndex + 1));
- } else {
- result =
- isBitmapOrHll
- ? resultSet.getBytes(columnIndex + 1)
- : resultSet.getObject(columnIndex + 1);
- }
- return result;
- }
-
- /*
- | Type | Java Array Type
|
-
|---------------------------------------------|----------------------------|
- | BOOLEAN | Boolean[]
|
- | TINYINT | Byte[]
|
- | SMALLINT | Short[]
|
- | INT | Integer[]
|
- | BIGINT | Long[]
|
- | LARGEINT | BigInteger[]
|
- | FLOAT | Float[]
|
- | DOUBLE | Double[]
|
- | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[]
|
- | DATE, DATEV2 | LocalDate[]
|
- | DATETIME, DATETIMEV2 | LocalDateTime[]
|
- | CHAR, VARCHAR, STRING | String[]
|
- | ARRAY | List<Object>[]
|
- | MAP | Map<Object, Object>[]
|
- | STRUCT | Map<String, Object>[]
|
- */
-
- private ColumnValueConverter getOutputConverter(
- ColumnType columnType, Class clz, String replaceString) {
- switch (columnType.getType()) {
- case BOOLEAN:
- if (Integer.class.equals(clz)) {
- return createConverter(input -> ((Integer) input) != 0,
Boolean.class);
- }
- if (Byte.class.equals(clz)) {
- return createConverter(input -> ((Byte) input) != 0,
Boolean.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input ->
- Boolean.parseBoolean(
- String.valueOf(input).equals("1")
? "true" : "false"),
- Boolean.class);
- }
- break;
- case TINYINT:
- if (Integer.class.equals(clz)) {
- return createConverter(input -> ((Integer)
input).byteValue(), Byte.class);
- }
- if (Short.class.equals(clz)) {
- return createConverter(input -> ((Short)
input).byteValue(), Byte.class);
- }
- if (Object.class.equals(clz)) {
- return createConverter(
- input -> (byte)
Integer.parseInt(String.valueOf(input)), Byte.class);
- }
- if (BigDecimal.class.equals(clz)) {
- return createConverter(input -> ((BigDecimal)
input).byteValue(), Byte.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> Byte.parseByte(String.valueOf(input)),
Byte.class);
- }
- break;
- case SMALLINT:
- if (Integer.class.equals(clz)) {
- return createConverter(input -> ((Integer)
input).shortValue(), Short.class);
- }
- if (BigDecimal.class.equals(clz)) {
- return createConverter(input -> ((BigDecimal)
input).shortValue(), Short.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> Short.parseShort(String.valueOf(input)),
Short.class);
- }
- if (Byte.class.equals(clz)) {
- return createConverter(input -> ((Byte)
input).shortValue(), Short.class);
- }
- if (com.clickhouse.data.value.UnsignedByte.class.equals(clz)) {
- return createConverter(
- input -> ((UnsignedByte) input).shortValue(),
Short.class);
- }
- break;
- case INT:
- if (Long.class.equals(clz)) {
- return createConverter(input -> ((Long) input).intValue(),
Integer.class);
- }
- if (BigDecimal.class.equals(clz)) {
- return createConverter(input -> ((BigDecimal)
input).intValue(), Integer.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> Integer.parseInt(String.valueOf(input)),
Integer.class);
- }
- if (Short.class.equals(clz)) {
- return createConverter(input -> ((Short)
input).intValue(), Integer.class);
- }
- if (com.clickhouse.data.value.UnsignedShort.class.equals(clz))
{
- return createConverter(
- input -> ((UnsignedShort) input).intValue(),
Integer.class);
- }
- break;
- case BIGINT:
- if (BigDecimal.class.equals(clz)) {
- return createConverter(input -> ((BigDecimal)
input).longValue(), Long.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> Long.parseLong(String.valueOf(input)),
Long.class);
- }
- if (Integer.class.equals(clz)) {
- return createConverter(input -> ((Integer)
input).longValue(), Long.class);
- }
- if
(com.clickhouse.data.value.UnsignedInteger.class.equals(clz)) {
- return createConverter(
- input -> ((UnsignedInteger) input).longValue(),
Long.class);
- }
- break;
- case LARGEINT:
- if (BigDecimal.class.equals(clz)) {
- return createConverter(
- input -> ((BigDecimal) input).toBigInteger(),
BigInteger.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> new BigInteger(String.valueOf(input)),
BigInteger.class);
- }
- if (Long.class.equals(clz)) {
- return createConverter(
- input -> BigInteger.valueOf((Long) input),
BigInteger.class);
- }
- if (com.clickhouse.data.value.UnsignedLong.class.equals(clz)) {
- return createConverter(
- input -> ((UnsignedLong) input).bigIntegerValue(),
BigInteger.class);
- }
- break;
- case DOUBLE:
- if (BigDecimal.class.equals(clz)) {
- return createConverter(
- input -> ((BigDecimal) input).doubleValue(),
Double.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input ->
Double.parseDouble(String.valueOf(input)), Double.class);
- }
- break;
- case FLOAT:
- return createConverter(
- input -> Float.parseFloat(String.valueOf(input)),
Float.class);
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- return createConverter(
- input -> new BigDecimal(String.valueOf(input)),
BigDecimal.class);
- case DATE:
- case DATEV2:
- if (Date.class.equals(clz)) {
- return createConverter(input -> ((Date)
input).toLocalDate(), LocalDate.class);
- }
- if (Timestamp.class.equals(clz)) {
- return createConverter(
- input -> ((Timestamp)
input).toLocalDateTime().toLocalDate(),
- LocalDate.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> LocalDate.parse(String.valueOf(input)),
LocalDate.class);
- }
- break;
- case DATETIME:
- case DATETIMEV2:
- if (Timestamp.class.equals(clz)) {
- return createConverter(
- input -> ((Timestamp) input).toLocalDateTime(),
LocalDateTime.class);
- }
- if (OffsetDateTime.class.equals(clz)) {
- return createConverter(
- input -> ((OffsetDateTime)
input).toLocalDateTime(),
- LocalDateTime.class);
- }
- if (oracle.sql.TIMESTAMP.class.equals(clz)) {
- return createConverter(
- input -> {
- try {
- return ((oracle.sql.TIMESTAMP) input)
- .timestampValue()
- .toLocalDateTime();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- },
- LocalDateTime.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input ->
- LocalDateTime.parse(
- String.valueOf(input),
-
getDateTimeFormatter(String.valueOf(input))),
- LocalDateTime.class);
- }
- break;
- case CHAR:
- return createConverter(
- input -> trimSpaces(tableType, input.toString()),
String.class);
- case VARCHAR:
- case STRING:
- if (byte[].class.equals(clz)) {
- if (replaceString.equals("bitmap") ||
replaceString.equals("hll")) {
- break;
- } else {
- return createConverter(
- input -> byteArrayToHexString(tableType,
(byte[]) input),
- String.class);
- }
- }
- if (Time.class.equals(clz)) {
- return createConverter(
- input -> timeToString((java.sql.Time) input),
String.class);
- }
- if (oracle.sql.CLOB.class.equals(clz)) {
- return createConverter(
- input -> {
- try {
- oracle.sql.CLOB clob = (oracle.sql.CLOB)
input;
- return clob.getSubString(1, (int)
clob.length());
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- },
- String.class);
- }
- if (java.net.Inet4Address.class.equals(clz)) {
- return createConverter(
- input -> ((InetAddress) input).getHostAddress(),
String.class);
- }
- if (java.net.Inet6Address.class.equals(clz)) {
- return createConverter(
- input -> {
- String inetAddress = ((InetAddress)
input).getHostAddress();
- return simplifyIPv6Address(inetAddress);
- },
- String.class);
- } else {
- return createConverter(Object::toString, String.class);
- }
- case ARRAY:
- if (java.sql.Array.class.equals(clz)) {
- return createConverter(
- input -> {
- try {
- return Arrays.asList(
- (Object[]) ((java.sql.Array)
input).getArray());
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- },
- List.class);
- }
- if (String.class.equals(clz)) {
- return createConverter(
- input -> {
- List<Object> list =
parseArray(String.valueOf(input));
- return convertArray(list,
columnType.getChildTypes().get(0));
- },
- List.class);
- }
- if (tableType == TOdbcTableType.CLICKHOUSE) {
- return createConverter(
- input -> {
- List<Object> list =
convertClickHouseArray(input);
- return convertArray(list,
columnType.getChildTypes().get(0));
- },
- List.class);
- }
- break;
- default:
- throw new IllegalArgumentException(
- "Unsupported column type: " + columnType.getType());
- }
- return null;
- }
-
- private ColumnValueConverter createConverter(
- Function<Object, ?> converterFunction, Class<?> type) {
- return (Object[] columnData) -> {
- Object[] result = (Object[]) Array.newInstance(type,
columnData.length);
- for (int i = 0; i < columnData.length; i++) {
- result[i] = columnData[i] != null ?
converterFunction.apply(columnData[i]) : null;
- }
- return result;
- };
- }
-
- private String byteArrayToHexString(TOdbcTableType tableType, byte[]
columnData) {
- if (tableType == TOdbcTableType.MYSQL || tableType ==
TOdbcTableType.OCEANBASE) {
- return mysqlByteArrayToHexString(columnData);
- } else if (tableType == TOdbcTableType.POSTGRESQL) {
- return pgByteArrayToHexString(columnData);
- } else {
- return defaultByteArrayToHexString(columnData);
- }
- }
-
- private String mysqlByteArrayToHexString(byte[] bytes) {
- StringBuilder hexString = new StringBuilder("0x");
- for (byte b : bytes) {
- String hex = Integer.toHexString(0xFF & b);
- if (hex.length() == 1) {
- hexString.append('0');
- }
- hexString.append(hex.toUpperCase());
- }
- return hexString.toString();
- }
-
- private static String pgByteArrayToHexString(byte[] bytes) {
- StringBuilder hexString = new StringBuilder("\\x");
- for (byte b : bytes) {
- hexString.append(String.format("%02x", b & 0xff));
- }
- return hexString.toString();
- }
-
- private String defaultByteArrayToHexString(byte[] bytes) {
- StringBuilder hexString = new StringBuilder();
- for (byte b : bytes) {
- String hex = Integer.toHexString(0xFF & b);
- if (hex.length() == 1) {
- hexString.append('0');
- }
- hexString.append(hex.toUpperCase());
- }
- return hexString.toString();
- }
-
- private String trimSpaces(TOdbcTableType tableType, String str) {
- if (tableType == TOdbcTableType.POSTGRESQL || tableType ==
TOdbcTableType.ORACLE) {
- int end = str.length() - 1;
- while (end >= 0 && str.charAt(end) == ' ') {
- end--;
- }
- return str.substring(0, end + 1);
- } else {
- return str;
- }
- }
-
- public String timeToString(java.sql.Time time) {
- long milliseconds = time.getTime() % 1000L;
- if (milliseconds > 0) {
- return String.format("%s.%03d", time, milliseconds);
- } else {
- return time.toString();
- }
- }
-
- private List<Object> convertArray(List<Object> list, ColumnType childType)
{
- Class<?> clz = Object.class;
- for (Object data : list) {
- if (data != null) {
- clz = data.getClass();
- break;
- }
- }
- List<Object> convertedList = new ArrayList<>(list.size());
- ColumnValueConverter converter = getOutputConverter(childType, clz,
"not_replace");
- for (Object element : list) {
- if (childType.isComplexType()) {
- convertedList.add(convertArray((List<Object>) element,
childType));
- } else {
- if (converter != null) {
- convertedList.add(converter.convert(new Object[]
{element})[0]);
- } else {
- convertedList.add(element);
- }
- }
- }
- return convertedList;
- }
-
- private static String simplifyIPv6Address(String address) {
- // Replace longest sequence of zeros with "::"
- String[] parts = address.split(":");
- int longestSeqStart = -1;
- int longestSeqLen = 0;
- int curSeqStart = -1;
- int curSeqLen = 0;
- for (int i = 0; i < parts.length; i++) {
- if (parts[i].equals("0")) {
- if (curSeqStart == -1) {
- curSeqStart = i;
- }
- curSeqLen++;
- if (curSeqLen > longestSeqLen) {
- longestSeqStart = curSeqStart;
- longestSeqLen = curSeqLen;
- }
- } else {
- curSeqStart = -1;
- curSeqLen = 0;
- }
- }
- if (longestSeqLen <= 1) {
- return address; // No sequences of zeros to replace
- }
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < longestSeqStart; i++) {
- sb.append(parts[i]).append(':');
- }
- sb.append(':');
- for (int i = longestSeqStart + longestSeqLen; i < parts.length; i++) {
- sb.append(parts[i]);
- if (i < parts.length - 1) {
- sb.append(':');
- }
- }
- return sb.toString();
- }
-
- private static final Pattern MILLIS_PATTERN = Pattern.compile("(\\.\\d+)");
-
- public static DateTimeFormatter getDateTimeFormatter(String
dateTimeString) {
- Matcher matcher = MILLIS_PATTERN.matcher(dateTimeString);
- int fractionDigits = 0;
- if (matcher.find()) {
- fractionDigits = matcher.group(1).length() - 1; // Subtract 1 to
exclude the dot
- }
- fractionDigits = Math.min(fractionDigits, 6); // Limit the fraction
digits to 6
-
- return new DateTimeFormatterBuilder()
- .appendPattern("yyyy-MM-dd HH:mm:ss")
- .appendFraction(ChronoField.MILLI_OF_SECOND, fractionDigits,
fractionDigits, true)
- .toFormatter();
- }
- private static final Map<Class<?>, Function<Object, List<Object>>>
CK_ARRAY_CONVERTERS =
- new HashMap<>();
+public interface JdbcExecutor {
+ int read() throws UdfRuntimeException;
- static {
- CK_ARRAY_CONVERTERS.put(String[].class, res ->
Arrays.asList((String[]) res));
- CK_ARRAY_CONVERTERS.put(boolean[].class, res -> toList((boolean[])
res));
- CK_ARRAY_CONVERTERS.put(Boolean[].class, res ->
Arrays.asList((Boolean[]) res));
- CK_ARRAY_CONVERTERS.put(byte[].class, res -> toList((byte[]) res));
- CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.asList((Byte[])
res));
- CK_ARRAY_CONVERTERS.put(LocalDate[].class, res ->
Arrays.asList((LocalDate[]) res));
- CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res ->
Arrays.asList((LocalDateTime[]) res));
- CK_ARRAY_CONVERTERS.put(float[].class, res -> toList((float[]) res));
- CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.asList((Float[])
res));
- CK_ARRAY_CONVERTERS.put(double[].class, res -> toList((double[]) res));
- CK_ARRAY_CONVERTERS.put(Double[].class, res ->
Arrays.asList((Double[]) res));
- CK_ARRAY_CONVERTERS.put(short[].class, res -> toList((short[]) res));
- CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.asList((Short[])
res));
- CK_ARRAY_CONVERTERS.put(int[].class, res -> toList((int[]) res));
- CK_ARRAY_CONVERTERS.put(Integer[].class, res ->
Arrays.asList((Integer[]) res));
- CK_ARRAY_CONVERTERS.put(long[].class, res -> toList((long[]) res));
- CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.asList((Long[])
res));
- CK_ARRAY_CONVERTERS.put(BigInteger[].class, res ->
Arrays.asList((BigInteger[]) res));
- CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res ->
Arrays.asList((BigDecimal[]) res));
- CK_ARRAY_CONVERTERS.put(
- Inet4Address[].class,
- res ->
- Arrays.stream((Inet4Address[]) res)
- .map(addr -> addr == null ? null :
addr.getHostAddress())
- .collect(Collectors.toList()));
- CK_ARRAY_CONVERTERS.put(
- Inet6Address[].class,
- res ->
- Arrays.stream((Inet6Address[]) res)
- .map(addr -> addr == null ? null :
simplifyIPv6Address(addr.getHostAddress()))
- .collect(Collectors.toList()));
- CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.asList((UUID[])
res));
- CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedByte[].class,
- res ->
Arrays.asList((com.clickhouse.data.value.UnsignedByte[]) res));
-
CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedShort[].class,
- res ->
Arrays.asList((com.clickhouse.data.value.UnsignedShort[]) res));
-
CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedInteger[].class,
- res ->
Arrays.asList((com.clickhouse.data.value.UnsignedInteger[]) res));
- CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedLong[].class,
- res ->
Arrays.asList((com.clickhouse.data.value.UnsignedLong[]) res));
- }
+ int write(Map<String, String> params) throws UdfRuntimeException;
- public static List<Object> convertClickHouseArray(Object obj) {
- Function<Object, List<Object>> converter =
CK_ARRAY_CONVERTERS.get(obj.getClass());
- return converter != null ? converter.apply(obj) :
Collections.singletonList(obj);
- }
+ long getBlockAddress(int batchSize, Map<String, String> outputParams)
throws UdfRuntimeException;
- private static <T> List<Object> toList(T array) {
- if (array instanceof Object[]) {
- return Arrays.asList((Object[]) array);
- }
- int length = Array.getLength(array);
- List<Object> list = new ArrayList<>(length);
- for (int i = 0; i < length; i++) {
- list.add(Array.get(array, i));
- }
- return list;
- }
+ void close() throws UdfRuntimeException, Exception;
- private static final Pattern ARRAY_PATTERN =
Pattern.compile("\"([^\"]*)\"|([^,]+)");
+ void openTrans() throws UdfRuntimeException;
- private static List<Object> parseArray(String input) {
- String trimmedInput = input.substring(1, input.length() - 1);
- List<Object> list = new ArrayList<>();
- Matcher matcher = ARRAY_PATTERN.matcher(trimmedInput);
- while (matcher.find()) {
- if (matcher.group(1) != null) {
- list.add(matcher.group(1));
- } else {
- list.add(matcher.group(2));
- }
- }
- return list;
- }
+ void commitTrans() throws UdfRuntimeException;
- private int insert(VectorTable data) throws SQLException {
- for (int i = 0; i < data.getNumRows(); ++i) {
- for (int j = 0; j < data.getColumns().length; ++j) {
- insertColumn(i, j, data.getColumns()[j]);
- }
- preparedStatement.addBatch();
- }
- preparedStatement.executeBatch();
- preparedStatement.clearBatch();
- return data.getNumRows();
- }
+ void rollbackTrans() throws UdfRuntimeException;
- private void insertColumn(int rowIdx, int colIdx, VectorColumn column)
throws SQLException {
- int parameterIndex = colIdx + 1;
- ColumnType.Type dorisType = column.getColumnTyp();
- if (column.isNullAt(rowIdx)) {
- insertNullColumn(parameterIndex, dorisType);
- return;
- }
- switch (dorisType) {
- case BOOLEAN:
- preparedStatement.setBoolean(parameterIndex,
column.getBoolean(rowIdx));
- break;
- case TINYINT:
- preparedStatement.setByte(parameterIndex,
column.getByte(rowIdx));
- break;
- case SMALLINT:
- preparedStatement.setShort(parameterIndex,
column.getShort(rowIdx));
- break;
- case INT:
- preparedStatement.setInt(parameterIndex,
column.getInt(rowIdx));
- break;
- case BIGINT:
- preparedStatement.setLong(parameterIndex,
column.getLong(rowIdx));
- break;
- case LARGEINT:
- preparedStatement.setObject(parameterIndex,
column.getBigInteger(rowIdx));
- break;
- case FLOAT:
- preparedStatement.setFloat(parameterIndex,
column.getFloat(rowIdx));
- break;
- case DOUBLE:
- preparedStatement.setDouble(parameterIndex,
column.getDouble(rowIdx));
- break;
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- preparedStatement.setBigDecimal(parameterIndex,
column.getDecimal(rowIdx));
- break;
- case DATEV2:
- preparedStatement.setDate(parameterIndex,
Date.valueOf(column.getDate(rowIdx)));
- break;
- case DATETIMEV2:
- preparedStatement.setTimestamp(
- parameterIndex,
Timestamp.valueOf(column.getDateTime(rowIdx)));
- break;
- case CHAR:
- case VARCHAR:
- case STRING:
- case BINARY:
- preparedStatement.setString(parameterIndex,
column.getStringWithOffset(rowIdx));
- break;
- default:
- throw new RuntimeException("Unknown type value: " + dorisType);
- }
- }
+ int getCurBlockRows();
- private void insertNullColumn(int parameterIndex, ColumnType.Type
dorisType)
- throws SQLException {
- switch (dorisType) {
- case BOOLEAN:
- preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
- break;
- case TINYINT:
- preparedStatement.setNull(parameterIndex, Types.TINYINT);
- break;
- case SMALLINT:
- preparedStatement.setNull(parameterIndex, Types.SMALLINT);
- break;
- case INT:
- preparedStatement.setNull(parameterIndex, Types.INTEGER);
- break;
- case BIGINT:
- preparedStatement.setNull(parameterIndex, Types.BIGINT);
- break;
- case LARGEINT:
- preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
- break;
- case FLOAT:
- preparedStatement.setNull(parameterIndex, Types.FLOAT);
- break;
- case DOUBLE:
- preparedStatement.setNull(parameterIndex, Types.DOUBLE);
- break;
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- preparedStatement.setNull(parameterIndex, Types.DECIMAL);
- break;
- case DATEV2:
- preparedStatement.setNull(parameterIndex, Types.DATE);
- break;
- case DATETIMEV2:
- preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
- break;
- case CHAR:
- case VARCHAR:
- case STRING:
- case BINARY:
- preparedStatement.setNull(parameterIndex, Types.VARCHAR);
- break;
- default:
- throw new RuntimeException("Unknown type value: " + dorisType);
- }
- }
+ boolean hasNext() throws UdfRuntimeException;
}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
new file mode 100644
index 00000000000..d67c58bb6b4
--- /dev/null
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.thrift.TOdbcTableType;
+
+public class JdbcExecutorFactory {
+ public static String getExecutorClass(TOdbcTableType type) {
+ switch (type) {
+ case MYSQL:
+ return "org/apache/doris/jdbc/MySQLJdbcExecutor";
+ case ORACLE:
+ return "org/apache/doris/jdbc/OracleJdbcExecutor";
+ default:
+ return "org/apache/doris/jdbc/DefaultJdbcExecutor";
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
new file mode 100644
index 00000000000..30b84ffbcde
--- /dev/null
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnType.Type;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorTable;
+import org.apache.doris.thrift.TJdbcOperation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MySQLJdbcExecutor extends BaseJdbcExecutor {
+ private static final Logger LOG =
Logger.getLogger(MySQLJdbcExecutor.class);
+
+ private static final Gson gson = new Gson();
+
+ public MySQLJdbcExecutor(byte[] thriftParams) throws Exception {
+ super(thriftParams);
+ }
+
+ @Override
+ protected boolean abortReadConnection(Connection connection, ResultSet
resultSet)
+ throws SQLException {
+ if (!resultSet.isAfterLast()) {
+ // Abort connection before closing. Without this, the MySQL driver
+ // attempts to drain the connection by reading all the results.
+ connection.abort(MoreExecutors.directExecutor());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void initializeStatement(Connection conn, JdbcDataSourceConfig
config, String sql) throws SQLException {
+ if (config.getOp() == TJdbcOperation.READ) {
+ conn.setAutoCommit(false);
+ Preconditions.checkArgument(sql != null, "SQL statement cannot be
null for READ operation.");
+ stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(Integer.MIN_VALUE); // MySQL: signal streaming
results with Integer.MIN_VALUE
+ batchSizeNum = config.getBatchSize();
+ } else {
+ LOG.info("Insert SQL: " + sql);
+ preparedStatement = conn.prepareStatement(sql);
+ }
+ }
+
+ @Override
+ protected void initializeBlock(int columnCount, String[]
replaceStringList, int batchSizeNum,
+ VectorTable outputTable) {
+ for (int i = 0; i < columnCount; ++i) {
+ if (replaceStringList[i].equals("bitmap") ||
replaceStringList[i].equals("hll")) {
+ block.add(new byte[batchSizeNum][]);
+ } else if (outputTable.getColumnType(i).getType() == Type.ARRAY) {
+ block.add(new String[batchSizeNum]);
+ } else if (outputTable.getColumnType(i).getType() == Type.STRING) {
+ block.add(new Object[batchSizeNum]);
+ } else {
+
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+ }
+ }
+ }
+
+ @Override
+ protected Object getColumnValue(int columnIndex, ColumnType type, String[]
replaceStringList) throws SQLException {
+ if (replaceStringList[columnIndex].equals("bitmap") ||
replaceStringList[columnIndex].equals("hll")) {
+ byte[] data = resultSet.getBytes(columnIndex + 1);
+ if (resultSet.wasNull()) {
+ return null;
+ }
+ return data;
+ } else {
+ switch (type.getType()) {
+ case BOOLEAN:
+ return resultSet.getObject(columnIndex + 1, Boolean.class);
+ case TINYINT:
+ return resultSet.getObject(columnIndex + 1, Byte.class);
+ case SMALLINT:
+ return resultSet.getObject(columnIndex + 1, Short.class);
+ case INT:
+ return resultSet.getObject(columnIndex + 1, Integer.class);
+ case BIGINT:
+ return resultSet.getObject(columnIndex + 1, Long.class);
+ case LARGEINT:
+ return resultSet.getObject(columnIndex + 1,
BigInteger.class);
+ case FLOAT:
+ return resultSet.getObject(columnIndex + 1, Float.class);
+ case DOUBLE:
+ return resultSet.getObject(columnIndex + 1, Double.class);
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ return resultSet.getObject(columnIndex + 1,
BigDecimal.class);
+ case DATE:
+ case DATEV2:
+ return resultSet.getObject(columnIndex + 1,
LocalDate.class);
+ case DATETIME:
+ case DATETIMEV2:
+ return resultSet.getObject(columnIndex + 1,
LocalDateTime.class);
+ case CHAR:
+ case VARCHAR:
+ case ARRAY:
+ return resultSet.getObject(columnIndex + 1, String.class);
+ case STRING:
+ return resultSet.getObject(columnIndex + 1);
+ default:
+ throw new IllegalArgumentException("Unsupported column
type: " + type.getType());
+ }
+ }
+ }
+
+ @Override
+ protected ColumnValueConverter getOutputConverter(ColumnType columnType,
String replaceString) {
+ switch (columnType.getType()) {
+ case STRING:
+ if (replaceString.equals("bitmap") ||
replaceString.equals("hll")) {
+ return null;
+ } else {
+ return createConverter(input -> {
+ if (input instanceof byte[]) {
+ return mysqlByteArrayToHexString((byte[]) input);
+ } else if (input instanceof java.sql.Time) {
+ return timeToString((java.sql.Time) input);
+ } else {
+ return input.toString();
+ }
+ }, String.class);
+ }
+ case ARRAY:
+ return createConverter(
+ (Object input) -> convertArray(input,
columnType.getChildTypes().get(0)),
+ List.class);
+ default:
+ return null;
+ }
+ }
+
+ private Object convertArray(Object input, ColumnType columnType) {
+ java.lang.reflect.Type listType = getListTypeForArray(columnType);
+ if (columnType.getType() == Type.BOOLEAN) {
+ List<?> list = gson.fromJson((String) input, List.class);
+ return list.stream().map(item -> {
+ if (item instanceof Boolean) {
+ return item;
+ } else if (item instanceof Number) {
+ return ((Number) item).intValue() != 0;
+ } else {
+ throw new IllegalArgumentException("Cannot convert " +
item + " to Boolean.");
+ }
+ }).collect(Collectors.toList());
+ } else if (columnType.getType() == Type.DATE || columnType.getType()
== Type.DATEV2) {
+ List<?> list = gson.fromJson((String) input, List.class);
+ return list.stream().map(item -> {
+ if (item instanceof String) {
+ return LocalDate.parse((String) item);
+ } else {
+ throw new IllegalArgumentException("Cannot convert " +
item + " to LocalDate.");
+ }
+ }).collect(Collectors.toList());
+ } else if (columnType.getType() == Type.DATETIME ||
columnType.getType() == Type.DATETIMEV2) {
+ List<?> list = gson.fromJson((String) input, List.class);
+ return list.stream().map(item -> {
+ if (item instanceof String) {
+ return LocalDateTime.parse(
+ (String) item,
+ new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+
.appendFraction(ChronoField.MILLI_OF_SECOND, columnType.getPrecision(),
+ columnType.getPrecision(), true)
+ .toFormatter());
+ } else {
+ throw new IllegalArgumentException("Cannot convert " +
item + " to LocalDateTime.");
+ }
+ }).collect(Collectors.toList());
+ } else if (columnType.getType() == Type.ARRAY) {
+ List<?> list = gson.fromJson((String) input, listType);
+ return list.stream()
+ .map(item -> convertArray(gson.toJson(item),
columnType.getChildTypes().get(0)))
+ .collect(Collectors.toList());
+ } else {
+ return gson.fromJson((String) input, listType);
+ }
+ }
+
+ private java.lang.reflect.Type getListTypeForArray(ColumnType type) {
+ switch (type.getType()) {
+ case BOOLEAN:
+ return new TypeToken<List<Boolean>>() {
+ }.getType();
+ case TINYINT:
+ return new TypeToken<List<Byte>>() {
+ }.getType();
+ case SMALLINT:
+ return new TypeToken<List<Short>>() {
+ }.getType();
+ case INT:
+ return new TypeToken<List<Integer>>() {
+ }.getType();
+ case BIGINT:
+ return new TypeToken<List<Long>>() {
+ }.getType();
+ case LARGEINT:
+ return new TypeToken<List<BigInteger>>() {
+ }.getType();
+ case FLOAT:
+ return new TypeToken<List<Float>>() {
+ }.getType();
+ case DOUBLE:
+ return new TypeToken<List<Double>>() {
+ }.getType();
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ return new TypeToken<List<BigDecimal>>() {
+ }.getType();
+ case DATE:
+ case DATEV2:
+ return new TypeToken<List<LocalDate>>() {
+ }.getType();
+ case DATETIME:
+ case DATETIMEV2:
+ return new TypeToken<List<LocalDateTime>>() {
+ }.getType();
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new TypeToken<List<String>>() {
+ }.getType();
+ case ARRAY:
+ java.lang.reflect.Type childType =
getListTypeForArray(type.getChildTypes().get(0));
+ TypeToken<?> token = TypeToken.getParameterized(List.class,
childType);
+ return token.getType();
+ default:
+ throw new IllegalArgumentException("Unsupported column type: "
+ type.getType());
+ }
+ }
+
+ private String mysqlByteArrayToHexString(byte[] bytes) {
+ StringBuilder hexString = new StringBuilder("0x");
+ for (byte b : bytes) {
+ String hex = Integer.toHexString(0xFF & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex.toUpperCase());
+ }
+ return hexString.toString();
+ }
+
+ private String timeToString(java.sql.Time time) {
+ long milliseconds = time.getTime() % 1000L;
+ if (milliseconds > 0) {
+ return String.format("%s.%03d", time, milliseconds);
+ } else {
+ return time.toString();
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
new file mode 100644
index 00000000000..bfb941939f5
--- /dev/null
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnType.Type;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+public class OracleJdbcExecutor extends BaseJdbcExecutor {
+ private static final Logger LOG =
Logger.getLogger(OracleJdbcExecutor.class);
+
+ public OracleJdbcExecutor(byte[] thriftParams) throws Exception {
+ super(thriftParams);
+ }
+
+ @Override
+ protected void setValidationQuery(DruidDataSource ds) {
+ ds.setValidationQuery("SELECT 1 FROM dual");
+ }
+
+ @Override
+ protected void initializeBlock(int columnCount, String[]
replaceStringList, int batchSizeNum,
+ VectorTable outputTable) {
+ for (int i = 0; i < columnCount; ++i) {
+ if (outputTable.getColumnType(i).getType() == Type.LARGEINT) {
+ block.add(new BigDecimal[batchSizeNum]);
+ } else if (outputTable.getColumnType(i).getType() == Type.STRING) {
+ block.add(new Object[batchSizeNum]);
+ } else {
+
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+ }
+ }
+ }
+
+ @Override
+ protected Object getColumnValue(int columnIndex, ColumnType type, String[]
replaceStringList) throws SQLException {
+ switch (type.getType()) {
+ case TINYINT:
+ return resultSet.getObject(columnIndex + 1, Byte.class);
+ case SMALLINT:
+ return resultSet.getObject(columnIndex + 1, Short.class);
+ case INT:
+ return resultSet.getObject(columnIndex + 1, Integer.class);
+ case BIGINT:
+ return resultSet.getObject(columnIndex + 1, Long.class);
+ case FLOAT:
+ return resultSet.getObject(columnIndex + 1, Float.class);
+ case DOUBLE:
+ return resultSet.getObject(columnIndex + 1, Double.class);
+ case LARGEINT:
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ return resultSet.getObject(columnIndex + 1, BigDecimal.class);
+ case DATE:
+ case DATEV2:
+ return resultSet.getObject(columnIndex + 1, LocalDate.class);
+ case DATETIME:
+ case DATETIMEV2:
+ return resultSet.getObject(columnIndex + 1,
LocalDateTime.class);
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return resultSet.getObject(columnIndex + 1);
+ default:
+ throw new IllegalArgumentException("Unsupported column type: "
+ type.getType());
+ }
+ }
+
+ @Override
+ protected ColumnValueConverter getOutputConverter(ColumnType columnType,
String replaceString) {
+ switch (columnType.getType()) {
+ case CHAR:
+ return createConverter(
+ input -> trimSpaces(input.toString()), String.class);
+ case LARGEINT:
+ return createConverter(
+ input -> ((BigDecimal) input).toBigInteger(),
BigInteger.class);
+ case STRING:
+ return createConverter(input -> {
+ if (input instanceof Clob) {
+ try {
+ return ((Clob) input).getSubString(1, (int)
((Clob) input).length());
+ } catch (SQLException e) {
+ LOG.error("Failed to get string from clob", e);
+ return null;
+ }
+ } else {
+ return input.toString();
+ }
+ }, String.class);
+ default:
+ return null;
+ }
+ }
+
+ private String trimSpaces(String str) {
+ int end = str.length() - 1;
+ while (end >= 0 && str.charAt(end) == ' ') {
+ end--;
+ }
+ return str.substring(0, end + 1);
+ }
+}
diff --git
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 2a636532069..3520a11d8bc 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -225,11 +225,13 @@ views
-- !dt_null --
\N
+0000-01-01T00:00
2023-06-17T10:00
-- !test_dz --
1 \N
2 2022-01-01
+3 0000-01-01
-- !test_filter_not --
张三1 11 12345678 123 321312 1999-02-13T00:00 中国
男 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]