This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3c37fb085cec1386c30ee78ba24bea46c027b829
Author: zy-kkk <[email protected]>
AuthorDate: Wed Feb 28 22:24:18 2024 +0800

    [refactor](jdbc catalog) split jdbc executor for different data sources 
(step-1) (#31406)
---
 be/src/vec/exec/vjdbc_connector.cpp                |   35 +-
 be/src/vec/exec/vjdbc_connector.h                  |    3 +
 .../docker-compose/mysql/init/04-insert.sql        |    4 +-
 .../org/apache/doris/jdbc/BaseJdbcExecutor.java    |  520 ++++++++++
 ...{JdbcExecutor.java => DefaultJdbcExecutor.java} |    6 +-
 .../java/org/apache/doris/jdbc/JdbcExecutor.java   | 1085 +-------------------
 .../org/apache/doris/jdbc/JdbcExecutorFactory.java |   33 +
 .../org/apache/doris/jdbc/MySQLJdbcExecutor.java   |  291 ++++++
 .../org/apache/doris/jdbc/OracleJdbcExecutor.java  |  131 +++
 .../jdbc/test_mysql_jdbc_catalog.out               |    2 +
 10 files changed, 1028 insertions(+), 1082 deletions(-)

diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index e6419ec95e7..4020278e84d 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -49,7 +49,7 @@
 #include "vec/io/reader_buffer.h"
 
 namespace doris::vectorized {
-const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/jdbc/JdbcExecutor";
+const char* JDBC_EXECUTOR_FACTORY_CLASS = 
"org/apache/doris/jdbc/JdbcExecutorFactory";
 const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
 const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
 const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
@@ -85,6 +85,7 @@ Status JdbcConnector::close(Status /*unused*/) {
     }
     JNIEnv* env;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    env->DeleteGlobalRef(_executor_factory_clazz);
     env->DeleteGlobalRef(_executor_clazz);
     DELETE_BASIC_JAVA_CLAZZ_REF(object)
     DELETE_BASIC_JAVA_CLAZZ_REF(string)
@@ -104,7 +105,29 @@ Status JdbcConnector::open(RuntimeState* state, bool read) 
{
 
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-    RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_CLASS, 
&_executor_clazz));
+    RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, 
JDBC_EXECUTOR_FACTORY_CLASS,
+                                                   &_executor_factory_clazz));
+
+    _executor_factory_ctor_id =
+            env->GetStaticMethodID(_executor_factory_clazz, "getExecutorClass",
+                                   
"(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;");
+    if (_executor_factory_ctor_id == nullptr) {
+        return Status::InternalError("Failed to find method ID for 
getExecutorClass");
+    }
+
+    jobject jtable_type = _get_java_table_type(env, _conn_param.table_type);
+
+    jstring executor_name = (jstring)env->CallStaticObjectMethod(
+            _executor_factory_clazz, _executor_factory_ctor_id, jtable_type);
+    if (executor_name == nullptr) {
+        return Status::InternalError("getExecutorClass returned null");
+    }
+    const char* executor_name_str = env->GetStringUTFChars(executor_name, 
nullptr);
+
+    RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, executor_name_str, 
&_executor_clazz));
+    env->DeleteLocalRef(jtable_type);
+    env->ReleaseStringUTFChars(executor_name, executor_name_str);
+    env->DeleteLocalRef(executor_name);
     GET_BASIC_JAVA_CLAZZ("java/util/List", list)
     GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
     GET_BASIC_JAVA_CLAZZ("java/lang/String", string)
@@ -747,4 +770,12 @@ Status JdbcConnector::_cast_string_to_json(const 
SlotDescriptor* slot_desc, Bloc
     return Status::OK();
 }
 
+jobject JdbcConnector::_get_java_table_type(JNIEnv* env, TOdbcTableType::type 
tableType) {
+    jclass enumClass = 
env->FindClass("org/apache/doris/thrift/TOdbcTableType");
+    jmethodID findByValueMethod = env->GetStaticMethodID(
+            enumClass, "findByValue", 
"(I)Lorg/apache/doris/thrift/TOdbcTableType;");
+    jobject javaEnumObj =
+            env->CallStaticObjectMethod(enumClass, findByValueMethod, 
static_cast<jint>(tableType));
+    return javaEnumObj;
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index 2ecdf210fac..ed2afdecdfd 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -131,13 +131,16 @@ private:
                                   int rows);
     Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, 
int column_index,
                                 int rows);
+    jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType);
 
     bool _closed = false;
+    jclass _executor_factory_clazz;
     jclass _executor_clazz;
     jclass _executor_list_clazz;
     jclass _executor_object_clazz;
     jclass _executor_string_clazz;
     jobject _executor_obj;
+    jmethodID _executor_factory_ctor_id;
     jmethodID _executor_ctor_id;
     jmethodID _executor_stmt_write_id;
     jmethodID _executor_read_id;
diff --git a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql 
b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
index bb1b0962b45..d71986b1354 100644
--- a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
@@ -1151,7 +1151,7 @@ VALUES ('2023-06-17 10:00:00', '2023-06-17 10:00:01.1', 
'2023-06-17 10:00:02.22'
 
 SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'STRICT_TRANS_TABLES',''));
 INSERT INTO doris_test.dt_null
-VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00');
+VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00'),('0000-01-01 00:00:00');
 
 
 insert into doris_test.test_key_word values (1, 1), (2, 2);
@@ -1159,7 +1159,7 @@ insert into doris_test.test_key_word values (1, 1), (2, 
2);
 SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_DATE',''));
 SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_IN_DATE',''));
 
-insert into doris_test.test_zd (id,d_z) VALUES 
(1,'0000-00-00'),(2,'2022-01-01');
+insert into doris_test.test_zd (id,d_z) VALUES 
(1,'0000-00-00'),(2,'2022-01-01'),(3,'0000-01-01');
 
 insert into Doris.DORIS values ('DORIS');
 insert into Doris.Doris values ('Doris');
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
new file mode 100644
index 00000000000..87f67807826
--- /dev/null
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.exception.InternalException;
+import org.apache.doris.common.exception.UdfRuntimeException;
+import org.apache.doris.common.jni.utils.UdfUtils;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorColumn;
+import org.apache.doris.common.jni.vec.VectorTable;
+import org.apache.doris.thrift.TJdbcExecutorCtorParams;
+import org.apache.doris.thrift.TJdbcOperation;
+import org.apache.doris.thrift.TOdbcTableType;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.google.common.base.Preconditions;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.net.MalformedURLException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public abstract class BaseJdbcExecutor implements JdbcExecutor {
+    private static final Logger LOG = Logger.getLogger(BaseJdbcExecutor.class);
+    private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new 
TBinaryProtocol.Factory();
+    private DruidDataSource druidDataSource = null;
+    private final byte[] druidDataSourceLock = new byte[0];
+    private TOdbcTableType tableType;
+    private JdbcDataSourceConfig config;
+    private Connection conn = null;
+    protected PreparedStatement preparedStatement = null;
+    protected Statement stmt = null;
+    protected ResultSet resultSet = null;
+    protected ResultSetMetaData resultSetMetaData = null;
+    protected List<String> resultColumnTypeNames = null;
+    protected List<Object[]> block = null;
+    protected VectorTable outputTable = null;
+    protected int batchSizeNum = 0;
+    protected int curBlockRows = 0;
+
+    public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
+        TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
+        TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
+        try {
+            deserializer.deserialize(request, thriftParams);
+        } catch (TException e) {
+            throw new InternalException(e.getMessage());
+        }
+        tableType = request.table_type;
+        this.config = new JdbcDataSourceConfig()
+                .setCatalogId(request.catalog_id)
+                .setJdbcUser(request.jdbc_user)
+                .setJdbcPassword(request.jdbc_password)
+                .setJdbcUrl(request.jdbc_url)
+                .setJdbcDriverUrl(request.driver_path)
+                .setJdbcDriverClass(request.jdbc_driver_class)
+                .setBatchSize(request.batch_size)
+                .setOp(request.op)
+                .setTableType(request.table_type)
+                .setConnectionPoolMinSize(request.connection_pool_min_size)
+                .setConnectionPoolMaxSize(request.connection_pool_max_size)
+                
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
+                
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
+                
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
+        
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
+        init(config, request.statement);
+    }
+
+    public void close() throws Exception {
+        try {
+            if (stmt != null) {
+                try {
+                    stmt.cancel();
+                } catch (SQLException e) {
+                    LOG.error("Error cancelling statement", e);
+                }
+            }
+
+            boolean shouldAbort = conn != null && resultSet != null;
+            boolean aborted = false; // Used to record whether the abort 
operation is performed
+            if (shouldAbort) {
+                aborted = abortReadConnection(conn, resultSet);
+            }
+
+            // If no abort operation is performed, the resource needs to be 
closed manually
+            if (!aborted) {
+                closeResources(resultSet, stmt, conn);
+            }
+        } finally {
+            if (config.getConnectionPoolMinSize() == 0 && druidDataSource != 
null) {
+                druidDataSource.close();
+                
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
+                druidDataSource = null;
+            }
+        }
+    }
+
+    private void closeResources(AutoCloseable... closeables) {
+        for (AutoCloseable closeable : closeables) {
+            if (closeable != null) {
+                try {
+                    if (closeable instanceof Connection) {
+                        if (!((Connection) closeable).isClosed()) {
+                            closeable.close();
+                        }
+                    } else {
+                        closeable.close();
+                    }
+                } catch (Exception e) {
+                    LOG.error("Cannot close resource: ", e);
+                }
+            }
+        }
+    }
+
+    protected boolean abortReadConnection(Connection connection, ResultSet 
resultSet)
+            throws SQLException {
+        return false;
+    }
+
+    public int read() throws UdfRuntimeException {
+        try {
+            resultSet = ((PreparedStatement) stmt).executeQuery();
+            resultSetMetaData = resultSet.getMetaData();
+            int columnCount = resultSetMetaData.getColumnCount();
+            resultColumnTypeNames = new ArrayList<>(columnCount);
+            block = new ArrayList<>(columnCount);
+            for (int i = 0; i < columnCount; ++i) {
+                
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+            }
+            return columnCount;
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
+    }
+
+    public long getBlockAddress(int batchSize, Map<String, String> 
outputParams) throws UdfRuntimeException {
+        try {
+            if (outputTable != null) {
+                outputTable.close();
+            }
+
+            outputTable = VectorTable.createWritableTable(outputParams, 0);
+
+            String isNullableString = outputParams.get("is_nullable");
+            String replaceString = outputParams.get("replace_string");
+
+            if (isNullableString == null || replaceString == null) {
+                throw new IllegalArgumentException(
+                        "Output parameters 'is_nullable' and 'replace_string' 
are required.");
+            }
+
+            String[] nullableList = isNullableString.split(",");
+            String[] replaceStringList = replaceString.split(",");
+            curBlockRows = 0;
+            int columnCount = resultSetMetaData.getColumnCount();
+
+            initializeBlock(columnCount, replaceStringList, batchSize, 
outputTable);
+
+            do {
+                for (int i = 0; i < columnCount; ++i) {
+                    ColumnType type = outputTable.getColumnType(i);
+                    block.get(i)[curBlockRows] = getColumnValue(i, type, 
replaceStringList);
+                }
+                curBlockRows++;
+            } while (curBlockRows < batchSize && resultSet.next());
+
+            for (int i = 0; i < columnCount; ++i) {
+                ColumnType type = outputTable.getColumnType(i);
+                Object[] columnData = block.get(i);
+                Class<?> componentType = 
columnData.getClass().getComponentType();
+                Object[] newColumn = (Object[]) 
Array.newInstance(componentType, curBlockRows);
+                System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
+                boolean isNullable = Boolean.parseBoolean(nullableList[i]);
+                outputTable.appendData(i, newColumn, getOutputConverter(type, 
replaceStringList[i]), isNullable);
+            }
+        } catch (Exception e) {
+            LOG.warn("jdbc get block address exception: ", e);
+            throw new UdfRuntimeException("jdbc get block address: ", e);
+        }
+        return outputTable.getMetaAddress();
+    }
+
+    protected void initializeBlock(int columnCount, String[] 
replaceStringList, int batchSizeNum,
+            VectorTable outputTable) {
+        for (int i = 0; i < columnCount; ++i) {
+            
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+        }
+    }
+
+    public int write(Map<String, String> params) throws UdfRuntimeException {
+        VectorTable batchTable = VectorTable.createReadableTable(params);
+        // Can't release or close batchTable, it's released by c++
+        try {
+            insert(batchTable);
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
+        return batchTable.getNumRows();
+    }
+
+    public void openTrans() throws UdfRuntimeException {
+        try {
+            if (conn != null) {
+                conn.setAutoCommit(false);
+            }
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor open transaction has 
error: ", e);
+        }
+    }
+
+    public void commitTrans() throws UdfRuntimeException {
+        try {
+            if (conn != null) {
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor commit transaction 
has error: ", e);
+        }
+    }
+
+    public void rollbackTrans() throws UdfRuntimeException {
+        try {
+            if (conn != null) {
+                conn.rollback();
+            }
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor rollback transaction 
has error: ", e);
+        }
+    }
+
+    public List<String> getResultColumnTypeNames() {
+        return resultColumnTypeNames;
+    }
+
+    public int getCurBlockRows() {
+        return curBlockRows;
+    }
+
+    public boolean hasNext() throws UdfRuntimeException {
+        try {
+            if (resultSet == null) {
+                return false;
+            }
+            return resultSet.next();
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("resultSet to get next error: ", e);
+        }
+    }
+
+    private void init(JdbcDataSourceConfig config, String sql) throws 
UdfRuntimeException {
+        String druidDataSourceKey = config.createCacheKey();
+        try {
+            ClassLoader parent = getClass().getClassLoader();
+            ClassLoader classLoader = 
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
+            druidDataSource = 
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
+            if (druidDataSource == null) {
+                synchronized (druidDataSourceLock) {
+                    druidDataSource = 
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
+                    if (druidDataSource == null) {
+                        long start = System.currentTimeMillis();
+                        DruidDataSource ds = new DruidDataSource();
+                        ds.setDriverClassLoader(classLoader);
+                        ds.setDriverClassName(config.getJdbcDriverClass());
+                        ds.setUrl(config.getJdbcUrl());
+                        ds.setUsername(config.getJdbcUser());
+                        ds.setPassword(config.getJdbcPassword());
+                        ds.setMinIdle(config.getConnectionPoolMinSize()); // 
default 1
+                        ds.setInitialSize(config.getConnectionPoolMinSize()); 
// default 1
+                        ds.setMaxActive(config.getConnectionPoolMaxSize()); // 
default 10
+                        ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); 
// default 5000
+                        ds.setTestWhileIdle(true);
+                        ds.setTestOnBorrow(false);
+                        setValidationQuery(ds);
+                        // default 3 min
+                        
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 
10L);
+                        // default 15 min
+                        
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
+                        // default 30 min
+                        
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
+                        ds.setKeepAlive(config.isConnectionPoolKeepAlive());
+                        // default 6 min
+                        
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
+                        druidDataSource = ds;
+                        
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
+                        LOG.info("JdbcClient set"
+                                + " ConnectionPoolMinSize = " + 
config.getConnectionPoolMinSize()
+                                + ", ConnectionPoolMaxSize = " + 
config.getConnectionPoolMaxSize()
+                                + ", ConnectionPoolMaxWaitTime = " + 
config.getConnectionPoolMaxWaitTime()
+                                + ", ConnectionPoolMaxLifeTime = " + 
config.getConnectionPoolMaxLifeTime()
+                                + ", ConnectionPoolKeepAlive = " + 
config.isConnectionPoolKeepAlive());
+                        LOG.info("init datasource [" + (config.getJdbcUrl() + 
config.getJdbcUser()) + "] cost: " + (
+                                System.currentTimeMillis() - start) + " ms");
+                    }
+                }
+            }
+
+            long start = System.currentTimeMillis();
+            conn = druidDataSource.getConnection();
+            LOG.info("get connection [" + (config.getJdbcUrl() + 
config.getJdbcUser()) + "] cost: " + (
+                    System.currentTimeMillis() - start)
+                    + " ms");
+
+            initializeStatement(conn, config, sql);
+
+        } catch (MalformedURLException e) {
+            throw new UdfRuntimeException("MalformedURLException to load class 
about " + config.getJdbcDriverUrl(), e);
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("Initialize datasource failed: ", e);
+        } catch (FileNotFoundException e) {
+            throw new UdfRuntimeException("FileNotFoundException failed: ", e);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("Initialize datasource failed: ", e);
+        }
+    }
+
+    protected void setValidationQuery(DruidDataSource ds) {
+        ds.setValidationQuery("SELECT 1");
+    }
+
+    protected void initializeStatement(Connection conn, JdbcDataSourceConfig 
config, String sql) throws SQLException {
+        if (config.getOp() == TJdbcOperation.READ) {
+            conn.setAutoCommit(false);
+            Preconditions.checkArgument(sql != null, "SQL statement cannot be 
null for READ operation.");
+            stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(config.getBatchSize()); // set fetch size to 
batch size
+            batchSizeNum = config.getBatchSize();
+        } else {
+            Preconditions.checkArgument(sql != null, "SQL statement cannot be 
null for WRITE operation.");
+            LOG.info("Insert SQL: " + sql);
+            preparedStatement = conn.prepareStatement(sql);
+        }
+    }
+
+    protected abstract Object getColumnValue(int columnIndex, ColumnType type, 
String[] replaceStringList)
+            throws SQLException;
+
+    /*
+    | Type                                        | Java Array Type            
|
+    
|---------------------------------------------|----------------------------|
+    | BOOLEAN                                     | Boolean[]                  
|
+    | TINYINT                                     | Byte[]                     
|
+    | SMALLINT                                    | Short[]                    
|
+    | INT                                         | Integer[]                  
|
+    | BIGINT                                      | Long[]                     
|
+    | LARGEINT                                    | BigInteger[]               
|
+    | FLOAT                                       | Float[]                    
|
+    | DOUBLE                                      | Double[]                   
|
+    | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[]               
|
+    | DATE, DATEV2                                | LocalDate[]                
|
+    | DATETIME, DATETIMEV2                        | LocalDateTime[]            
|
+    | CHAR, VARCHAR, STRING                       | String[]                   
|
+    | ARRAY                                       | List<Object>[]             
|
+    | MAP                                         | Map<Object, Object>[]      
|
+    | STRUCT                                      | Map<String, Object>[]      
|
+    */
+
+    protected abstract ColumnValueConverter getOutputConverter(ColumnType 
columnType, String replaceString);
+
+    protected ColumnValueConverter createConverter(
+            Function<Object, ?> converterFunction, Class<?> type) {
+        return (Object[] columnData) -> {
+            Object[] result = (Object[]) Array.newInstance(type, 
columnData.length);
+            for (int i = 0; i < columnData.length; i++) {
+                result[i] = columnData[i] != null ? 
converterFunction.apply(columnData[i]) : null;
+            }
+            return result;
+        };
+    }
+
+    private int insert(VectorTable data) throws SQLException {
+        for (int i = 0; i < data.getNumRows(); ++i) {
+            for (int j = 0; j < data.getColumns().length; ++j) {
+                insertColumn(i, j, data.getColumns()[j]);
+            }
+            preparedStatement.addBatch();
+        }
+        preparedStatement.executeBatch();
+        preparedStatement.clearBatch();
+        return data.getNumRows();
+    }
+
+    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
+        int parameterIndex = colIdx + 1;
+        ColumnType.Type dorisType = column.getColumnTyp();
+        if (column.isNullAt(rowIdx)) {
+            insertNullColumn(parameterIndex, dorisType);
+            return;
+        }
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
+                break;
+            case TINYINT:
+                preparedStatement.setByte(parameterIndex, 
column.getByte(rowIdx));
+                break;
+            case SMALLINT:
+                preparedStatement.setShort(parameterIndex, 
column.getShort(rowIdx));
+                break;
+            case INT:
+                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
+                break;
+            case BIGINT:
+                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
+                break;
+            case LARGEINT:
+                preparedStatement.setObject(parameterIndex, 
column.getBigInteger(rowIdx));
+                break;
+            case FLOAT:
+                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
+                break;
+            case DOUBLE:
+                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setBigDecimal(parameterIndex, 
column.getDecimal(rowIdx));
+                break;
+            case DATEV2:
+                preparedStatement.setDate(parameterIndex, 
Date.valueOf(column.getDate(rowIdx)));
+                break;
+            case DATETIMEV2:
+                preparedStatement.setTimestamp(
+                        parameterIndex, 
Timestamp.valueOf(column.getDateTime(rowIdx)));
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+            case BINARY:
+                preparedStatement.setString(parameterIndex, 
column.getStringWithOffset(rowIdx));
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+
+    private void insertNullColumn(int parameterIndex, ColumnType.Type 
dorisType)
+            throws SQLException {
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+                break;
+            case TINYINT:
+                preparedStatement.setNull(parameterIndex, Types.TINYINT);
+                break;
+            case SMALLINT:
+                preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+                break;
+            case INT:
+                preparedStatement.setNull(parameterIndex, Types.INTEGER);
+                break;
+            case BIGINT:
+                preparedStatement.setNull(parameterIndex, Types.BIGINT);
+                break;
+            case LARGEINT:
+                preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
+                break;
+            case FLOAT:
+                preparedStatement.setNull(parameterIndex, Types.FLOAT);
+                break;
+            case DOUBLE:
+                preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+                break;
+            case DATEV2:
+                preparedStatement.setNull(parameterIndex, Types.DATE);
+                break;
+            case DATETIMEV2:
+                preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+            case BINARY:
+                preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
similarity index 99%
copy from 
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
copy to 
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
index 3f00643d126..94d4304db38 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java
@@ -78,8 +78,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-public class JdbcExecutor {
-    private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
+public class DefaultJdbcExecutor {
+    private static final Logger LOG = 
Logger.getLogger(DefaultJdbcExecutor.class);
     private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new 
TBinaryProtocol.Factory();
     private Connection conn = null;
     private PreparedStatement preparedStatement = null;
@@ -97,7 +97,7 @@ public class JdbcExecutor {
     private TOdbcTableType tableType;
     private JdbcDataSourceConfig config;
 
-    public JdbcExecutor(byte[] thriftParams) throws Exception {
+    public DefaultJdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
         TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
         try {
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
index 3f00643d126..85c40e21d16 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
@@ -17,1091 +17,26 @@
 
 package org.apache.doris.jdbc;
 
-import org.apache.doris.common.exception.InternalException;
 import org.apache.doris.common.exception.UdfRuntimeException;
-import org.apache.doris.common.jni.utils.UdfUtils;
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ColumnValueConverter;
-import org.apache.doris.common.jni.vec.VectorColumn;
-import org.apache.doris.common.jni.vec.VectorTable;
-import org.apache.doris.thrift.TJdbcExecutorCtorParams;
-import org.apache.doris.thrift.TJdbcOperation;
-import org.apache.doris.thrift.TOdbcTableType;
 
-import com.alibaba.druid.pool.DruidDataSource;
-import com.clickhouse.data.value.UnsignedByte;
-import com.clickhouse.data.value.UnsignedInteger;
-import com.clickhouse.data.value.UnsignedLong;
-import com.clickhouse.data.value.UnsignedShort;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.vesoft.nebula.client.graph.data.ValueWrapper;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-import java.io.FileNotFoundException;
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.Inet4Address;
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public class JdbcExecutor {
-    private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
-    private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new 
TBinaryProtocol.Factory();
-    private Connection conn = null;
-    private PreparedStatement preparedStatement = null;
-    private Statement stmt = null;
-    private ResultSet resultSet = null;
-    private ResultSetMetaData resultSetMetaData = null;
-    private List<String> resultColumnTypeNames = null;
-    private List<Object[]> block = null;
-    private VectorTable outputTable = null;
-    private int batchSizeNum = 0;
-    private int curBlockRows = 0;
-    private static final byte[] emptyBytes = new byte[0];
-    private DruidDataSource druidDataSource = null;
-    private final byte[] druidDataSourceLock = new byte[0];
-    private TOdbcTableType tableType;
-    private JdbcDataSourceConfig config;
-
-    public JdbcExecutor(byte[] thriftParams) throws Exception {
-        TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
-        TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
-        try {
-            deserializer.deserialize(request, thriftParams);
-        } catch (TException e) {
-            throw new InternalException(e.getMessage());
-        }
-        tableType = request.table_type;
-        this.config = new JdbcDataSourceConfig()
-                .setCatalogId(request.catalog_id)
-                .setJdbcUser(request.jdbc_user)
-                .setJdbcPassword(request.jdbc_password)
-                .setJdbcUrl(request.jdbc_url)
-                .setJdbcDriverUrl(request.driver_path)
-                .setJdbcDriverClass(request.jdbc_driver_class)
-                .setBatchSize(request.batch_size)
-                .setOp(request.op)
-                .setTableType(request.table_type)
-                .setConnectionPoolMinSize(request.connection_pool_min_size)
-                .setConnectionPoolMaxSize(request.connection_pool_max_size)
-                
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
-                
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
-                
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
-        
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
-        init(config, request.statement);
-    }
-
-    public void close() throws Exception {
-        try {
-            if (stmt != null) {
-                try {
-                    stmt.cancel();
-                } catch (SQLException e) {
-                    LOG.error("Error cancelling statement", e);
-                }
-            }
-
-            boolean shouldAbort = conn != null && resultSet != null
-                    && (tableType == TOdbcTableType.MYSQL || tableType == 
TOdbcTableType.SQLSERVER);
-            boolean aborted = false; // Used to record whether the abort 
operation is performed
-            if (shouldAbort) {
-                aborted = abortReadConnection(conn, resultSet, tableType);
-            }
-
-            // If no abort operation is performed, the resource needs to be 
closed manually
-            if (!aborted) {
-                closeResources(resultSet, stmt, conn);
-            }
-        } finally {
-            if (config.getConnectionPoolMinSize() == 0 && druidDataSource != 
null) {
-                druidDataSource.close();
-                
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
-                druidDataSource = null;
-            }
-        }
-    }
-
-    private void closeResources(AutoCloseable... closeables) {
-        for (AutoCloseable closeable : closeables) {
-            if (closeable != null) {
-                try {
-                    if (closeable instanceof Connection) {
-                        if (!((Connection) closeable).isClosed()) {
-                            closeable.close();
-                        }
-                    } else {
-                        closeable.close();
-                    }
-                } catch (Exception e) {
-                    LOG.error("Cannot close resource: ", e);
-                }
-            }
-        }
-    }
-
-    public boolean abortReadConnection(Connection connection, ResultSet 
resultSet, TOdbcTableType tableType)
-            throws SQLException {
-        if (!resultSet.isAfterLast() && (tableType == TOdbcTableType.MYSQL || 
tableType == TOdbcTableType.SQLSERVER)) {
-            // Abort connection before closing. Without this, the 
MySQL/SQLServer driver
-            // attempts to drain the connection by reading all the results.
-            connection.abort(MoreExecutors.directExecutor());
-            return true;
-        }
-        return false;
-    }
-
-    public int read() throws UdfRuntimeException {
-        try {
-            resultSet = ((PreparedStatement) stmt).executeQuery();
-            resultSetMetaData = resultSet.getMetaData();
-            int columnCount = resultSetMetaData.getColumnCount();
-            resultColumnTypeNames = new ArrayList<>(columnCount);
-            block = new ArrayList<>(columnCount);
-            if (isNebula()) {
-                for (int i = 0; i < columnCount; ++i) {
-                    block.add((Object[]) Array.newInstance(Object.class, 
batchSizeNum));
-                }
-            } else {
-                for (int i = 0; i < columnCount; ++i) {
-                    
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
-                    block.add((Object[]) Array.newInstance(Object.class, 
batchSizeNum));
-                }
-            }
-            return columnCount;
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
-        }
-    }
-
-    public long getBlockAddress(int batchSize, Map<String, String> 
outputParams)
-            throws UdfRuntimeException {
-        try {
-            if (outputTable != null) {
-                outputTable.close();
-            }
-
-            String isNullableString = outputParams.get("is_nullable");
-            String replaceString = outputParams.get("replace_string");
-
-            if (isNullableString == null || replaceString == null) {
-                throw new IllegalArgumentException(
-                        "Output parameters 'is_nullable' and 'replace_string' 
are required.");
-            }
-
-            String[] nullableList = isNullableString.split(",");
-            String[] replaceStringList = replaceString.split(",");
-            curBlockRows = 0;
-            int columnCount = resultSetMetaData.getColumnCount();
-
-            do {
-                for (int i = 0; i < columnCount; ++i) {
-                    boolean isBitmapOrHll =
-                            replaceStringList[i].equals("bitmap")
-                                    || replaceStringList[i].equals("hll");
-                    block.get(i)[curBlockRows] = getColumnValue(tableType, i, 
isBitmapOrHll);
-                }
-                curBlockRows++;
-            } while (curBlockRows < batchSize && resultSet.next());
-
-            outputTable = VectorTable.createWritableTable(outputParams, 
curBlockRows);
-
-            for (int i = 0; i < columnCount; ++i) {
-                Object[] columnData = block.get(i);
-                ColumnType type = outputTable.getColumnType(i);
-                Class<?> clz = findNonNullClass(columnData, type);
-                Object[] newColumn = (Object[]) Array.newInstance(clz, 
curBlockRows);
-                System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
-                boolean isNullable = Boolean.parseBoolean(nullableList[i]);
-                outputTable.appendData(
-                        i,
-                        newColumn,
-                        getOutputConverter(type, clz, replaceStringList[i]),
-                        isNullable);
-            }
-        } catch (Exception e) {
-            LOG.warn("jdbc get block address exception: ", e);
-            throw new UdfRuntimeException("jdbc get block address: ", e);
-        }
-        return outputTable.getMetaAddress();
-    }
-
-    public int write(Map<String, String> params) throws UdfRuntimeException {
-        VectorTable batchTable = VectorTable.createReadableTable(params);
-        // Can't release or close batchTable, it's released by c++
-        try {
-            insert(batchTable);
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
-        }
-        return batchTable.getNumRows();
-    }
-
-    public void openTrans() throws UdfRuntimeException {
-        try {
-            if (conn != null) {
-                conn.setAutoCommit(false);
-            }
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor open transaction has 
error: ", e);
-        }
-    }
-
-    public void commitTrans() throws UdfRuntimeException {
-        try {
-            if (conn != null) {
-                conn.commit();
-            }
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor commit transaction 
has error: ", e);
-        }
-    }
-
-    public void rollbackTrans() throws UdfRuntimeException {
-        try {
-            if (conn != null) {
-                conn.rollback();
-            }
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor rollback transaction 
has error: ", e);
-        }
-    }
-
-    public List<String> getResultColumnTypeNames() {
-        return resultColumnTypeNames;
-    }
-
-    public int getCurBlockRows() {
-        return curBlockRows;
-    }
-
-    public boolean hasNext() throws UdfRuntimeException {
-        try {
-            if (resultSet == null) {
-                return false;
-            }
-            return resultSet.next();
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("resultSet to get next error: ", e);
-        }
-    }
-
-    private void init(JdbcDataSourceConfig config, String sql) throws 
UdfRuntimeException {
-        String druidDataSourceKey = config.createCacheKey();
-        try {
-            if (isNebula()) {
-                batchSizeNum = config.getBatchSize();
-                Class.forName(config.getJdbcDriverClass());
-                conn = 
DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(),
-                        config.getJdbcPassword());
-                stmt = conn.prepareStatement(sql);
-            } else {
-                ClassLoader parent = getClass().getClassLoader();
-                ClassLoader classLoader = 
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
-                druidDataSource = 
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
-                if (druidDataSource == null) {
-                    synchronized (druidDataSourceLock) {
-                        druidDataSource = 
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
-                        if (druidDataSource == null) {
-                            long start = System.currentTimeMillis();
-                            DruidDataSource ds = new DruidDataSource();
-                            ds.setDriverClassLoader(classLoader);
-                            ds.setDriverClassName(config.getJdbcDriverClass());
-                            ds.setUrl(config.getJdbcUrl());
-                            ds.setUsername(config.getJdbcUser());
-                            ds.setPassword(config.getJdbcPassword());
-                            ds.setMinIdle(config.getConnectionPoolMinSize()); 
// default 1
-                            
ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1
-                            
ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10
-                            
ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000
-                            ds.setTestWhileIdle(true);
-                            ds.setTestOnBorrow(false);
-                            setValidationQuery(ds, config.getTableType());
-                            // default 3 min
-                            
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 
10L);
-                            // default 15 min
-                            
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
-                            // default 30 min
-                            
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
-                            
ds.setKeepAlive(config.isConnectionPoolKeepAlive());
-                            // default 6 min
-                            
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
-                            druidDataSource = ds;
-                            
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
-                            LOG.info("JdbcClient set"
-                                    + " ConnectionPoolMinSize = " + 
config.getConnectionPoolMinSize()
-                                    + ", ConnectionPoolMaxSize = " + 
config.getConnectionPoolMaxSize()
-                                    + ", ConnectionPoolMaxWaitTime = " + 
config.getConnectionPoolMaxWaitTime()
-                                    + ", ConnectionPoolMaxLifeTime = " + 
config.getConnectionPoolMaxLifeTime()
-                                    + ", ConnectionPoolKeepAlive = " + 
config.isConnectionPoolKeepAlive());
-                            LOG.info("init datasource [" + 
(config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
-                                    System.currentTimeMillis() - start) + " 
ms");
-                        }
-                    }
-                }
-
-                long start = System.currentTimeMillis();
-                conn = druidDataSource.getConnection();
-                LOG.info("get connection [" + (config.getJdbcUrl() + 
config.getJdbcUser()) + "] cost: " + (
-                        System.currentTimeMillis() - start)
-                        + " ms");
-                if (config.getOp() == TJdbcOperation.READ) {
-                    conn.setAutoCommit(false);
-                    Preconditions.checkArgument(sql != null);
-                    stmt = conn.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-                    if (tableType == TOdbcTableType.MYSQL) {
-                        stmt.setFetchSize(Integer.MIN_VALUE);
-                    } else {
-                        stmt.setFetchSize(config.getBatchSize());
-                    }
-                    batchSizeNum = config.getBatchSize();
-                } else {
-                    LOG.info("insert sql: " + sql);
-                    preparedStatement = conn.prepareStatement(sql);
-                }
-            }
-        } catch (MalformedURLException e) {
-            throw new UdfRuntimeException("MalformedURLException to load class 
about " + config.getJdbcDriverUrl(), e);
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("Initialize datasource failed: ", e);
-        } catch (FileNotFoundException e) {
-            throw new UdfRuntimeException("FileNotFoundException failed: ", e);
-        } catch (Exception e) {
-            throw new UdfRuntimeException("Initialize datasource failed: ", e);
-        }
-    }
-
-    private void setValidationQuery(DruidDataSource ds, TOdbcTableType 
tableType) {
-        if (tableType == TOdbcTableType.ORACLE || tableType == 
TOdbcTableType.OCEANBASE_ORACLE) {
-            ds.setValidationQuery("SELECT 1 FROM dual");
-        } else if (tableType == TOdbcTableType.SAP_HANA) {
-            ds.setValidationQuery("SELECT 1 FROM DUMMY");
-        } else {
-            ds.setValidationQuery("SELECT 1");
-        }
-    }
-
-    public boolean isNebula() {
-        return tableType == TOdbcTableType.NEBULA;
-    }
-
-    private Class<?> findNonNullClass(Object[] columnData, ColumnType type) {
-        for (Object data : columnData) {
-            if (data != null) {
-                return data.getClass();
-            }
-        }
-        switch (type.getType()) {
-            case BOOLEAN:
-                return Boolean.class;
-            case TINYINT:
-                return Byte.class;
-            case SMALLINT:
-                return Short.class;
-            case INT:
-                return Integer.class;
-            case BIGINT:
-                return Long.class;
-            case LARGEINT:
-                return BigInteger.class;
-            case FLOAT:
-                return Float.class;
-            case DOUBLE:
-                return Double.class;
-            case DECIMALV2:
-            case DECIMAL32:
-            case DECIMAL64:
-            case DECIMAL128:
-                return BigDecimal.class;
-            case DATE:
-            case DATEV2:
-                return LocalDate.class;
-            case DATETIME:
-            case DATETIMEV2:
-                return LocalDateTime.class;
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-                return String.class;
-            case ARRAY:
-                return List.class;
-            default:
-                throw new IllegalArgumentException(
-                        "Unsupported column type: " + type.getType());
-        }
-    }
-
-    public Object getColumnValue(TOdbcTableType tableType, int columnIndex, 
boolean isBitmapOrHll)
-            throws SQLException {
-        Object result;
-        if (tableType == TOdbcTableType.NEBULA) {
-            result = UdfUtils.convertObject((ValueWrapper) 
resultSet.getObject(columnIndex + 1));
-        } else {
-            result =
-                    isBitmapOrHll
-                            ? resultSet.getBytes(columnIndex + 1)
-                            : resultSet.getObject(columnIndex + 1);
-        }
-        return result;
-    }
-
-    /*
-    | Type                                        | Java Array Type            
|
-    
|---------------------------------------------|----------------------------|
-    | BOOLEAN                                     | Boolean[]                  
|
-    | TINYINT                                     | Byte[]                     
|
-    | SMALLINT                                    | Short[]                    
|
-    | INT                                         | Integer[]                  
|
-    | BIGINT                                      | Long[]                     
|
-    | LARGEINT                                    | BigInteger[]               
|
-    | FLOAT                                       | Float[]                    
|
-    | DOUBLE                                      | Double[]                   
|
-    | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[]               
|
-    | DATE, DATEV2                                | LocalDate[]                
|
-    | DATETIME, DATETIMEV2                        | LocalDateTime[]            
|
-    | CHAR, VARCHAR, STRING                       | String[]                   
|
-    | ARRAY                                       | List<Object>[]             
|
-    | MAP                                         | Map<Object, Object>[]      
|
-    | STRUCT                                      | Map<String, Object>[]      
|
-    */
-
-    private ColumnValueConverter getOutputConverter(
-            ColumnType columnType, Class clz, String replaceString) {
-        switch (columnType.getType()) {
-            case BOOLEAN:
-                if (Integer.class.equals(clz)) {
-                    return createConverter(input -> ((Integer) input) != 0, 
Boolean.class);
-                }
-                if (Byte.class.equals(clz)) {
-                    return createConverter(input -> ((Byte) input) != 0, 
Boolean.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input ->
-                                    Boolean.parseBoolean(
-                                            String.valueOf(input).equals("1") 
? "true" : "false"),
-                            Boolean.class);
-                }
-                break;
-            case TINYINT:
-                if (Integer.class.equals(clz)) {
-                    return createConverter(input -> ((Integer) 
input).byteValue(), Byte.class);
-                }
-                if (Short.class.equals(clz)) {
-                    return createConverter(input -> ((Short) 
input).byteValue(), Byte.class);
-                }
-                if (Object.class.equals(clz)) {
-                    return createConverter(
-                            input -> (byte) 
Integer.parseInt(String.valueOf(input)), Byte.class);
-                }
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(input -> ((BigDecimal) 
input).byteValue(), Byte.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> Byte.parseByte(String.valueOf(input)), 
Byte.class);
-                }
-                break;
-            case SMALLINT:
-                if (Integer.class.equals(clz)) {
-                    return createConverter(input -> ((Integer) 
input).shortValue(), Short.class);
-                }
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(input -> ((BigDecimal) 
input).shortValue(), Short.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> Short.parseShort(String.valueOf(input)), 
Short.class);
-                }
-                if (Byte.class.equals(clz)) {
-                    return createConverter(input -> ((Byte) 
input).shortValue(), Short.class);
-                }
-                if (com.clickhouse.data.value.UnsignedByte.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((UnsignedByte) input).shortValue(), 
Short.class);
-                }
-                break;
-            case INT:
-                if (Long.class.equals(clz)) {
-                    return createConverter(input -> ((Long) input).intValue(), 
Integer.class);
-                }
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(input -> ((BigDecimal) 
input).intValue(), Integer.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> Integer.parseInt(String.valueOf(input)), 
Integer.class);
-                }
-                if (Short.class.equals(clz)) {
-                    return createConverter(input -> ((Short) 
input).intValue(), Integer.class);
-                }
-                if (com.clickhouse.data.value.UnsignedShort.class.equals(clz)) 
{
-                    return createConverter(
-                            input -> ((UnsignedShort) input).intValue(), 
Integer.class);
-                }
-                break;
-            case BIGINT:
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(input -> ((BigDecimal) 
input).longValue(), Long.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> Long.parseLong(String.valueOf(input)), 
Long.class);
-                }
-                if (Integer.class.equals(clz)) {
-                    return createConverter(input -> ((Integer) 
input).longValue(), Long.class);
-                }
-                if 
(com.clickhouse.data.value.UnsignedInteger.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((UnsignedInteger) input).longValue(), 
Long.class);
-                }
-                break;
-            case LARGEINT:
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((BigDecimal) input).toBigInteger(), 
BigInteger.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> new BigInteger(String.valueOf(input)), 
BigInteger.class);
-                }
-                if (Long.class.equals(clz)) {
-                    return createConverter(
-                            input -> BigInteger.valueOf((Long) input), 
BigInteger.class);
-                }
-                if (com.clickhouse.data.value.UnsignedLong.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((UnsignedLong) input).bigIntegerValue(), 
BigInteger.class);
-                }
-                break;
-            case DOUBLE:
-                if (BigDecimal.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((BigDecimal) input).doubleValue(), 
Double.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> 
Double.parseDouble(String.valueOf(input)), Double.class);
-                }
-                break;
-            case FLOAT:
-                return createConverter(
-                        input -> Float.parseFloat(String.valueOf(input)), 
Float.class);
-            case DECIMALV2:
-            case DECIMAL32:
-            case DECIMAL64:
-            case DECIMAL128:
-                return createConverter(
-                        input -> new BigDecimal(String.valueOf(input)), 
BigDecimal.class);
-            case DATE:
-            case DATEV2:
-                if (Date.class.equals(clz)) {
-                    return createConverter(input -> ((Date) 
input).toLocalDate(), LocalDate.class);
-                }
-                if (Timestamp.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((Timestamp) 
input).toLocalDateTime().toLocalDate(),
-                            LocalDate.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> LocalDate.parse(String.valueOf(input)), 
LocalDate.class);
-                }
-                break;
-            case DATETIME:
-            case DATETIMEV2:
-                if (Timestamp.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((Timestamp) input).toLocalDateTime(), 
LocalDateTime.class);
-                }
-                if (OffsetDateTime.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((OffsetDateTime) 
input).toLocalDateTime(),
-                            LocalDateTime.class);
-                }
-                if (oracle.sql.TIMESTAMP.class.equals(clz)) {
-                    return createConverter(
-                            input -> {
-                                try {
-                                    return ((oracle.sql.TIMESTAMP) input)
-                                            .timestampValue()
-                                            .toLocalDateTime();
-                                } catch (SQLException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            },
-                            LocalDateTime.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input ->
-                                    LocalDateTime.parse(
-                                            String.valueOf(input),
-                                            
getDateTimeFormatter(String.valueOf(input))),
-                            LocalDateTime.class);
-                }
-                break;
-            case CHAR:
-                return createConverter(
-                        input -> trimSpaces(tableType, input.toString()), 
String.class);
-            case VARCHAR:
-            case STRING:
-                if (byte[].class.equals(clz)) {
-                    if (replaceString.equals("bitmap") || 
replaceString.equals("hll")) {
-                        break;
-                    } else {
-                        return createConverter(
-                                input -> byteArrayToHexString(tableType, 
(byte[]) input),
-                                String.class);
-                    }
-                }
-                if (Time.class.equals(clz)) {
-                    return createConverter(
-                            input -> timeToString((java.sql.Time) input), 
String.class);
-                }
-                if (oracle.sql.CLOB.class.equals(clz)) {
-                    return createConverter(
-                            input -> {
-                                try {
-                                    oracle.sql.CLOB clob = (oracle.sql.CLOB) 
input;
-                                    return clob.getSubString(1, (int) 
clob.length());
-                                } catch (SQLException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            },
-                            String.class);
-                }
-                if (java.net.Inet4Address.class.equals(clz)) {
-                    return createConverter(
-                            input -> ((InetAddress) input).getHostAddress(), 
String.class);
-                }
-                if (java.net.Inet6Address.class.equals(clz)) {
-                    return createConverter(
-                            input -> {
-                                String inetAddress = ((InetAddress) 
input).getHostAddress();
-                                return simplifyIPv6Address(inetAddress);
-                            },
-                            String.class);
-                } else {
-                    return createConverter(Object::toString, String.class);
-                }
-            case ARRAY:
-                if (java.sql.Array.class.equals(clz)) {
-                    return createConverter(
-                            input -> {
-                                try {
-                                    return Arrays.asList(
-                                            (Object[]) ((java.sql.Array) 
input).getArray());
-                                } catch (SQLException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            },
-                            List.class);
-                }
-                if (String.class.equals(clz)) {
-                    return createConverter(
-                            input -> {
-                                List<Object> list = 
parseArray(String.valueOf(input));
-                                return convertArray(list, 
columnType.getChildTypes().get(0));
-                            },
-                            List.class);
-                }
-                if (tableType == TOdbcTableType.CLICKHOUSE) {
-                    return createConverter(
-                            input -> {
-                                List<Object> list = 
convertClickHouseArray(input);
-                                return convertArray(list, 
columnType.getChildTypes().get(0));
-                            },
-                            List.class);
-                }
-                break;
-            default:
-                throw new IllegalArgumentException(
-                        "Unsupported column type: " + columnType.getType());
-        }
-        return null;
-    }
-
-    private ColumnValueConverter createConverter(
-            Function<Object, ?> converterFunction, Class<?> type) {
-        return (Object[] columnData) -> {
-            Object[] result = (Object[]) Array.newInstance(type, 
columnData.length);
-            for (int i = 0; i < columnData.length; i++) {
-                result[i] = columnData[i] != null ? 
converterFunction.apply(columnData[i]) : null;
-            }
-            return result;
-        };
-    }
-
-    private String byteArrayToHexString(TOdbcTableType tableType, byte[] 
columnData) {
-        if (tableType == TOdbcTableType.MYSQL || tableType == 
TOdbcTableType.OCEANBASE) {
-            return mysqlByteArrayToHexString(columnData);
-        } else if (tableType == TOdbcTableType.POSTGRESQL) {
-            return pgByteArrayToHexString(columnData);
-        } else {
-            return defaultByteArrayToHexString(columnData);
-        }
-    }
-
-    private String mysqlByteArrayToHexString(byte[] bytes) {
-        StringBuilder hexString = new StringBuilder("0x");
-        for (byte b : bytes) {
-            String hex = Integer.toHexString(0xFF & b);
-            if (hex.length() == 1) {
-                hexString.append('0');
-            }
-            hexString.append(hex.toUpperCase());
-        }
-        return hexString.toString();
-    }
-
-    private static String pgByteArrayToHexString(byte[] bytes) {
-        StringBuilder hexString = new StringBuilder("\\x");
-        for (byte b : bytes) {
-            hexString.append(String.format("%02x", b & 0xff));
-        }
-        return hexString.toString();
-    }
-
-    private String defaultByteArrayToHexString(byte[] bytes) {
-        StringBuilder hexString = new StringBuilder();
-        for (byte b : bytes) {
-            String hex = Integer.toHexString(0xFF & b);
-            if (hex.length() == 1) {
-                hexString.append('0');
-            }
-            hexString.append(hex.toUpperCase());
-        }
-        return hexString.toString();
-    }
-
-    private String trimSpaces(TOdbcTableType tableType, String str) {
-        if (tableType == TOdbcTableType.POSTGRESQL || tableType == 
TOdbcTableType.ORACLE) {
-            int end = str.length() - 1;
-            while (end >= 0 && str.charAt(end) == ' ') {
-                end--;
-            }
-            return str.substring(0, end + 1);
-        } else {
-            return str;
-        }
-    }
-
-    public String timeToString(java.sql.Time time) {
-        long milliseconds = time.getTime() % 1000L;
-        if (milliseconds > 0) {
-            return String.format("%s.%03d", time, milliseconds);
-        } else {
-            return time.toString();
-        }
-    }
-
-    private List<Object> convertArray(List<Object> list, ColumnType childType) 
{
-        Class<?> clz = Object.class;
-        for (Object data : list) {
-            if (data != null) {
-                clz = data.getClass();
-                break;
-            }
-        }
-        List<Object> convertedList = new ArrayList<>(list.size());
-        ColumnValueConverter converter = getOutputConverter(childType, clz, 
"not_replace");
-        for (Object element : list) {
-            if (childType.isComplexType()) {
-                convertedList.add(convertArray((List<Object>) element, 
childType));
-            } else {
-                if (converter != null) {
-                    convertedList.add(converter.convert(new Object[] 
{element})[0]);
-                } else {
-                    convertedList.add(element);
-                }
-            }
-        }
-        return convertedList;
-    }
-
-    private static String simplifyIPv6Address(String address) {
-        // Replace longest sequence of zeros with "::"
-        String[] parts = address.split(":");
-        int longestSeqStart = -1;
-        int longestSeqLen = 0;
-        int curSeqStart = -1;
-        int curSeqLen = 0;
-        for (int i = 0; i < parts.length; i++) {
-            if (parts[i].equals("0")) {
-                if (curSeqStart == -1) {
-                    curSeqStart = i;
-                }
-                curSeqLen++;
-                if (curSeqLen > longestSeqLen) {
-                    longestSeqStart = curSeqStart;
-                    longestSeqLen = curSeqLen;
-                }
-            } else {
-                curSeqStart = -1;
-                curSeqLen = 0;
-            }
-        }
-        if (longestSeqLen <= 1) {
-            return address; // No sequences of zeros to replace
-        }
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < longestSeqStart; i++) {
-            sb.append(parts[i]).append(':');
-        }
-        sb.append(':');
-        for (int i = longestSeqStart + longestSeqLen; i < parts.length; i++) {
-            sb.append(parts[i]);
-            if (i < parts.length - 1) {
-                sb.append(':');
-            }
-        }
-        return sb.toString();
-    }
-
-    private static final Pattern MILLIS_PATTERN = Pattern.compile("(\\.\\d+)");
-
-    public static DateTimeFormatter getDateTimeFormatter(String 
dateTimeString) {
-        Matcher matcher = MILLIS_PATTERN.matcher(dateTimeString);
-        int fractionDigits = 0;
-        if (matcher.find()) {
-            fractionDigits = matcher.group(1).length() - 1; // Subtract 1 to 
exclude the dot
-        }
-        fractionDigits = Math.min(fractionDigits, 6); // Limit the fraction 
digits to 6
-
-        return new DateTimeFormatterBuilder()
-                .appendPattern("yyyy-MM-dd HH:mm:ss")
-                .appendFraction(ChronoField.MILLI_OF_SECOND, fractionDigits, 
fractionDigits, true)
-                .toFormatter();
-    }
 
-    private static final Map<Class<?>, Function<Object, List<Object>>> 
CK_ARRAY_CONVERTERS =
-            new HashMap<>();
+public interface JdbcExecutor {
+    int read() throws UdfRuntimeException;
 
-    static {
-        CK_ARRAY_CONVERTERS.put(String[].class, res -> 
Arrays.asList((String[]) res));
-        CK_ARRAY_CONVERTERS.put(boolean[].class, res -> toList((boolean[]) 
res));
-        CK_ARRAY_CONVERTERS.put(Boolean[].class, res -> 
Arrays.asList((Boolean[]) res));
-        CK_ARRAY_CONVERTERS.put(byte[].class, res -> toList((byte[]) res));
-        CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.asList((Byte[]) 
res));
-        CK_ARRAY_CONVERTERS.put(LocalDate[].class, res -> 
Arrays.asList((LocalDate[]) res));
-        CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res -> 
Arrays.asList((LocalDateTime[]) res));
-        CK_ARRAY_CONVERTERS.put(float[].class, res -> toList((float[]) res));
-        CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.asList((Float[]) 
res));
-        CK_ARRAY_CONVERTERS.put(double[].class, res -> toList((double[]) res));
-        CK_ARRAY_CONVERTERS.put(Double[].class, res -> 
Arrays.asList((Double[]) res));
-        CK_ARRAY_CONVERTERS.put(short[].class, res -> toList((short[]) res));
-        CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.asList((Short[]) 
res));
-        CK_ARRAY_CONVERTERS.put(int[].class, res -> toList((int[]) res));
-        CK_ARRAY_CONVERTERS.put(Integer[].class, res -> 
Arrays.asList((Integer[]) res));
-        CK_ARRAY_CONVERTERS.put(long[].class, res -> toList((long[]) res));
-        CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.asList((Long[]) 
res));
-        CK_ARRAY_CONVERTERS.put(BigInteger[].class, res -> 
Arrays.asList((BigInteger[]) res));
-        CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res -> 
Arrays.asList((BigDecimal[]) res));
-        CK_ARRAY_CONVERTERS.put(
-                Inet4Address[].class,
-                res ->
-                        Arrays.stream((Inet4Address[]) res)
-                                .map(addr -> addr == null ? null : 
addr.getHostAddress())
-                                .collect(Collectors.toList()));
-        CK_ARRAY_CONVERTERS.put(
-                Inet6Address[].class,
-                res ->
-                        Arrays.stream((Inet6Address[]) res)
-                                .map(addr -> addr == null ? null : 
simplifyIPv6Address(addr.getHostAddress()))
-                                .collect(Collectors.toList()));
-        CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.asList((UUID[]) 
res));
-        CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedByte[].class,
-                res -> 
Arrays.asList((com.clickhouse.data.value.UnsignedByte[]) res));
-        
CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedShort[].class,
-                res -> 
Arrays.asList((com.clickhouse.data.value.UnsignedShort[]) res));
-        
CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedInteger[].class,
-                res -> 
Arrays.asList((com.clickhouse.data.value.UnsignedInteger[]) res));
-        CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedLong[].class,
-                res -> 
Arrays.asList((com.clickhouse.data.value.UnsignedLong[]) res));
-    }
+    int write(Map<String, String> params) throws UdfRuntimeException;
 
-    public static List<Object> convertClickHouseArray(Object obj) {
-        Function<Object, List<Object>> converter = 
CK_ARRAY_CONVERTERS.get(obj.getClass());
-        return converter != null ? converter.apply(obj) : 
Collections.singletonList(obj);
-    }
+    long getBlockAddress(int batchSize, Map<String, String> outputParams) 
throws UdfRuntimeException;
 
-    private static <T> List<Object> toList(T array) {
-        if (array instanceof Object[]) {
-            return Arrays.asList((Object[]) array);
-        }
-        int length = Array.getLength(array);
-        List<Object> list = new ArrayList<>(length);
-        for (int i = 0; i < length; i++) {
-            list.add(Array.get(array, i));
-        }
-        return list;
-    }
+    void close() throws UdfRuntimeException, Exception;
 
-    private static final Pattern ARRAY_PATTERN = 
Pattern.compile("\"([^\"]*)\"|([^,]+)");
+    void openTrans() throws UdfRuntimeException;
 
-    private static List<Object> parseArray(String input) {
-        String trimmedInput = input.substring(1, input.length() - 1);
-        List<Object> list = new ArrayList<>();
-        Matcher matcher = ARRAY_PATTERN.matcher(trimmedInput);
-        while (matcher.find()) {
-            if (matcher.group(1) != null) {
-                list.add(matcher.group(1));
-            } else {
-                list.add(matcher.group(2));
-            }
-        }
-        return list;
-    }
+    void commitTrans() throws UdfRuntimeException;
 
-    private int insert(VectorTable data) throws SQLException {
-        for (int i = 0; i < data.getNumRows(); ++i) {
-            for (int j = 0; j < data.getColumns().length; ++j) {
-                insertColumn(i, j, data.getColumns()[j]);
-            }
-            preparedStatement.addBatch();
-        }
-        preparedStatement.executeBatch();
-        preparedStatement.clearBatch();
-        return data.getNumRows();
-    }
+    void rollbackTrans() throws UdfRuntimeException;
 
-    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
-        int parameterIndex = colIdx + 1;
-        ColumnType.Type dorisType = column.getColumnTyp();
-        if (column.isNullAt(rowIdx)) {
-            insertNullColumn(parameterIndex, dorisType);
-            return;
-        }
-        switch (dorisType) {
-            case BOOLEAN:
-                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
-                break;
-            case TINYINT:
-                preparedStatement.setByte(parameterIndex, 
column.getByte(rowIdx));
-                break;
-            case SMALLINT:
-                preparedStatement.setShort(parameterIndex, 
column.getShort(rowIdx));
-                break;
-            case INT:
-                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
-                break;
-            case BIGINT:
-                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
-                break;
-            case LARGEINT:
-                preparedStatement.setObject(parameterIndex, 
column.getBigInteger(rowIdx));
-                break;
-            case FLOAT:
-                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
-                break;
-            case DOUBLE:
-                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
-                break;
-            case DECIMALV2:
-            case DECIMAL32:
-            case DECIMAL64:
-            case DECIMAL128:
-                preparedStatement.setBigDecimal(parameterIndex, 
column.getDecimal(rowIdx));
-                break;
-            case DATEV2:
-                preparedStatement.setDate(parameterIndex, 
Date.valueOf(column.getDate(rowIdx)));
-                break;
-            case DATETIMEV2:
-                preparedStatement.setTimestamp(
-                        parameterIndex, 
Timestamp.valueOf(column.getDateTime(rowIdx)));
-                break;
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-            case BINARY:
-                preparedStatement.setString(parameterIndex, 
column.getStringWithOffset(rowIdx));
-                break;
-            default:
-                throw new RuntimeException("Unknown type value: " + dorisType);
-        }
-    }
+    int getCurBlockRows();
 
-    private void insertNullColumn(int parameterIndex, ColumnType.Type 
dorisType)
-            throws SQLException {
-        switch (dorisType) {
-            case BOOLEAN:
-                preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
-                break;
-            case TINYINT:
-                preparedStatement.setNull(parameterIndex, Types.TINYINT);
-                break;
-            case SMALLINT:
-                preparedStatement.setNull(parameterIndex, Types.SMALLINT);
-                break;
-            case INT:
-                preparedStatement.setNull(parameterIndex, Types.INTEGER);
-                break;
-            case BIGINT:
-                preparedStatement.setNull(parameterIndex, Types.BIGINT);
-                break;
-            case LARGEINT:
-                preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
-                break;
-            case FLOAT:
-                preparedStatement.setNull(parameterIndex, Types.FLOAT);
-                break;
-            case DOUBLE:
-                preparedStatement.setNull(parameterIndex, Types.DOUBLE);
-                break;
-            case DECIMALV2:
-            case DECIMAL32:
-            case DECIMAL64:
-            case DECIMAL128:
-                preparedStatement.setNull(parameterIndex, Types.DECIMAL);
-                break;
-            case DATEV2:
-                preparedStatement.setNull(parameterIndex, Types.DATE);
-                break;
-            case DATETIMEV2:
-                preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
-                break;
-            case CHAR:
-            case VARCHAR:
-            case STRING:
-            case BINARY:
-                preparedStatement.setNull(parameterIndex, Types.VARCHAR);
-                break;
-            default:
-                throw new RuntimeException("Unknown type value: " + dorisType);
-        }
-    }
+    boolean hasNext() throws UdfRuntimeException;
 }
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
new file mode 100644
index 00000000000..d67c58bb6b4
--- /dev/null
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.thrift.TOdbcTableType;
+
+public class JdbcExecutorFactory {
+    public static String getExecutorClass(TOdbcTableType type) {
+        switch (type) {
+            case MYSQL:
+                return "org/apache/doris/jdbc/MySQLJdbcExecutor";
+            case ORACLE:
+                return "org/apache/doris/jdbc/OracleJdbcExecutor";
+            default:
+                return "org/apache/doris/jdbc/DefaultJdbcExecutor";
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
new file mode 100644
index 00000000000..30b84ffbcde
--- /dev/null
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnType.Type;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorTable;
+import org.apache.doris.thrift.TJdbcOperation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MySQLJdbcExecutor extends BaseJdbcExecutor {
+    private static final Logger LOG = 
Logger.getLogger(MySQLJdbcExecutor.class);
+
+    private static final Gson gson = new Gson();
+
+    public MySQLJdbcExecutor(byte[] thriftParams) throws Exception {
+        super(thriftParams);
+    }
+
+    @Override
+    protected boolean abortReadConnection(Connection connection, ResultSet 
resultSet)
+            throws SQLException {
+        if (!resultSet.isAfterLast()) {
+            // Abort connection before closing. Without this, the MySQL driver
+            // attempts to drain the connection by reading all the results.
+            connection.abort(MoreExecutors.directExecutor());
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    protected void initializeStatement(Connection conn, JdbcDataSourceConfig 
config, String sql) throws SQLException {
+        if (config.getOp() == TJdbcOperation.READ) {
+            conn.setAutoCommit(false);
+            Preconditions.checkArgument(sql != null, "SQL statement cannot be 
null for READ operation.");
+            stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(Integer.MIN_VALUE);  // MySQL: signal streaming 
results with Integer.MIN_VALUE
+            batchSizeNum = config.getBatchSize();
+        } else {
+            LOG.info("Insert SQL: " + sql);
+            preparedStatement = conn.prepareStatement(sql);
+        }
+    }
+
+    @Override
+    protected void initializeBlock(int columnCount, String[] 
replaceStringList, int batchSizeNum,
+            VectorTable outputTable) {
+        for (int i = 0; i < columnCount; ++i) {
+            if (replaceStringList[i].equals("bitmap") || 
replaceStringList[i].equals("hll")) {
+                block.add(new byte[batchSizeNum][]);
+            } else if (outputTable.getColumnType(i).getType() == Type.ARRAY) {
+                block.add(new String[batchSizeNum]);
+            } else if (outputTable.getColumnType(i).getType() == Type.STRING) {
+                block.add(new Object[batchSizeNum]);
+            } else {
+                
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+            }
+        }
+    }
+
+    @Override
+    protected Object getColumnValue(int columnIndex, ColumnType type, String[] 
replaceStringList) throws SQLException {
+        if (replaceStringList[columnIndex].equals("bitmap") || 
replaceStringList[columnIndex].equals("hll")) {
+            byte[] data = resultSet.getBytes(columnIndex + 1);
+            if (resultSet.wasNull()) {
+                return null;
+            }
+            return data;
+        } else {
+            switch (type.getType()) {
+                case BOOLEAN:
+                    return resultSet.getObject(columnIndex + 1, Boolean.class);
+                case TINYINT:
+                    return resultSet.getObject(columnIndex + 1, Byte.class);
+                case SMALLINT:
+                    return resultSet.getObject(columnIndex + 1, Short.class);
+                case INT:
+                    return resultSet.getObject(columnIndex + 1, Integer.class);
+                case BIGINT:
+                    return resultSet.getObject(columnIndex + 1, Long.class);
+                case LARGEINT:
+                    return resultSet.getObject(columnIndex + 1, 
BigInteger.class);
+                case FLOAT:
+                    return resultSet.getObject(columnIndex + 1, Float.class);
+                case DOUBLE:
+                    return resultSet.getObject(columnIndex + 1, Double.class);
+                case DECIMALV2:
+                case DECIMAL32:
+                case DECIMAL64:
+                case DECIMAL128:
+                    return resultSet.getObject(columnIndex + 1, 
BigDecimal.class);
+                case DATE:
+                case DATEV2:
+                    return resultSet.getObject(columnIndex + 1, 
LocalDate.class);
+                case DATETIME:
+                case DATETIMEV2:
+                    return resultSet.getObject(columnIndex + 1, 
LocalDateTime.class);
+                case CHAR:
+                case VARCHAR:
+                case ARRAY:
+                    return resultSet.getObject(columnIndex + 1, String.class);
+                case STRING:
+                    return resultSet.getObject(columnIndex + 1);
+                default:
+                    throw new IllegalArgumentException("Unsupported column 
type: " + type.getType());
+            }
+        }
+    }
+
+    @Override
+    protected ColumnValueConverter getOutputConverter(ColumnType columnType, 
String replaceString) {
+        switch (columnType.getType()) {
+            case STRING:
+                if (replaceString.equals("bitmap") || 
replaceString.equals("hll")) {
+                    return null;
+                } else {
+                    return createConverter(input -> {
+                        if (input instanceof byte[]) {
+                            return mysqlByteArrayToHexString((byte[]) input);
+                        } else if (input instanceof java.sql.Time) {
+                            return timeToString((java.sql.Time) input);
+                        } else {
+                            return input.toString();
+                        }
+                    }, String.class);
+                }
+            case ARRAY:
+                return createConverter(
+                        (Object input) -> convertArray(input, 
columnType.getChildTypes().get(0)),
+                        List.class);
+            default:
+                return null;
+        }
+    }
+
+    private Object convertArray(Object input, ColumnType columnType) {
+        java.lang.reflect.Type listType = getListTypeForArray(columnType);
+        if (columnType.getType() == Type.BOOLEAN) {
+            List<?> list = gson.fromJson((String) input, List.class);
+            return list.stream().map(item -> {
+                if (item instanceof Boolean) {
+                    return item;
+                } else if (item instanceof Number) {
+                    return ((Number) item).intValue() != 0;
+                } else {
+                    throw new IllegalArgumentException("Cannot convert " + 
item + " to Boolean.");
+                }
+            }).collect(Collectors.toList());
+        } else if (columnType.getType() == Type.DATE || columnType.getType() 
== Type.DATEV2) {
+            List<?> list = gson.fromJson((String) input, List.class);
+            return list.stream().map(item -> {
+                if (item instanceof String) {
+                    return LocalDate.parse((String) item);
+                } else {
+                    throw new IllegalArgumentException("Cannot convert " + 
item + " to LocalDate.");
+                }
+            }).collect(Collectors.toList());
+        } else if (columnType.getType() == Type.DATETIME || 
columnType.getType() == Type.DATETIMEV2) {
+            List<?> list = gson.fromJson((String) input, List.class);
+            return list.stream().map(item -> {
+                if (item instanceof String) {
+                    return LocalDateTime.parse(
+                            (String) item,
+                            new DateTimeFormatterBuilder()
+                                    .appendPattern("yyyy-MM-dd HH:mm:ss")
+                                    
.appendFraction(ChronoField.MILLI_OF_SECOND, columnType.getPrecision(),
+                                            columnType.getPrecision(), true)
+                                    .toFormatter());
+                } else {
+                    throw new IllegalArgumentException("Cannot convert " + 
item + " to LocalDateTime.");
+                }
+            }).collect(Collectors.toList());
+        } else if (columnType.getType() == Type.ARRAY) {
+            List<?> list = gson.fromJson((String) input, listType);
+            return list.stream()
+                    .map(item -> convertArray(gson.toJson(item), 
columnType.getChildTypes().get(0)))
+                    .collect(Collectors.toList());
+        } else {
+            return gson.fromJson((String) input, listType);
+        }
+    }
+
+    private java.lang.reflect.Type getListTypeForArray(ColumnType type) {
+        switch (type.getType()) {
+            case BOOLEAN:
+                return new TypeToken<List<Boolean>>() {
+                }.getType();
+            case TINYINT:
+                return new TypeToken<List<Byte>>() {
+                }.getType();
+            case SMALLINT:
+                return new TypeToken<List<Short>>() {
+                }.getType();
+            case INT:
+                return new TypeToken<List<Integer>>() {
+                }.getType();
+            case BIGINT:
+                return new TypeToken<List<Long>>() {
+                }.getType();
+            case LARGEINT:
+                return new TypeToken<List<BigInteger>>() {
+                }.getType();
+            case FLOAT:
+                return new TypeToken<List<Float>>() {
+                }.getType();
+            case DOUBLE:
+                return new TypeToken<List<Double>>() {
+                }.getType();
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                return new TypeToken<List<BigDecimal>>() {
+                }.getType();
+            case DATE:
+            case DATEV2:
+                return new TypeToken<List<LocalDate>>() {
+                }.getType();
+            case DATETIME:
+            case DATETIMEV2:
+                return new TypeToken<List<LocalDateTime>>() {
+                }.getType();
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return new TypeToken<List<String>>() {
+                }.getType();
+            case ARRAY:
+                java.lang.reflect.Type childType = 
getListTypeForArray(type.getChildTypes().get(0));
+                TypeToken<?> token = TypeToken.getParameterized(List.class, 
childType);
+                return token.getType();
+            default:
+                throw new IllegalArgumentException("Unsupported column type: " 
+ type.getType());
+        }
+    }
+
+    private String mysqlByteArrayToHexString(byte[] bytes) {
+        StringBuilder hexString = new StringBuilder("0x");
+        for (byte b : bytes) {
+            String hex = Integer.toHexString(0xFF & b);
+            if (hex.length() == 1) {
+                hexString.append('0');
+            }
+            hexString.append(hex.toUpperCase());
+        }
+        return hexString.toString();
+    }
+
+    private String timeToString(java.sql.Time time) {
+        long milliseconds = time.getTime() % 1000L;
+        if (milliseconds > 0) {
+            return String.format("%s.%03d", time, milliseconds);
+        } else {
+            return time.toString();
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
new file mode 100644
index 00000000000..bfb941939f5
--- /dev/null
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnType.Type;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+public class OracleJdbcExecutor extends BaseJdbcExecutor {
+    private static final Logger LOG = 
Logger.getLogger(OracleJdbcExecutor.class);
+
+    public OracleJdbcExecutor(byte[] thriftParams) throws Exception {
+        super(thriftParams);
+    }
+
+    @Override
+    protected void setValidationQuery(DruidDataSource ds) {
+        ds.setValidationQuery("SELECT 1 FROM dual");
+    }
+
+    @Override
+    protected void initializeBlock(int columnCount, String[] 
replaceStringList, int batchSizeNum,
+            VectorTable outputTable) {
+        for (int i = 0; i < columnCount; ++i) {
+            if (outputTable.getColumnType(i).getType() == Type.LARGEINT) {
+                block.add(new BigDecimal[batchSizeNum]);
+            } else if (outputTable.getColumnType(i).getType() == Type.STRING) {
+                block.add(new Object[batchSizeNum]);
+            } else {
+                
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
+            }
+        }
+    }
+
+    @Override
+    protected Object getColumnValue(int columnIndex, ColumnType type, String[] 
replaceStringList) throws SQLException {
+        switch (type.getType()) {
+            case TINYINT:
+                return resultSet.getObject(columnIndex + 1, Byte.class);
+            case SMALLINT:
+                return resultSet.getObject(columnIndex + 1, Short.class);
+            case INT:
+                return resultSet.getObject(columnIndex + 1, Integer.class);
+            case BIGINT:
+                return resultSet.getObject(columnIndex + 1, Long.class);
+            case FLOAT:
+                return resultSet.getObject(columnIndex + 1, Float.class);
+            case DOUBLE:
+                return resultSet.getObject(columnIndex + 1, Double.class);
+            case LARGEINT:
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                return resultSet.getObject(columnIndex + 1, BigDecimal.class);
+            case DATE:
+            case DATEV2:
+                return resultSet.getObject(columnIndex + 1, LocalDate.class);
+            case DATETIME:
+            case DATETIMEV2:
+                return resultSet.getObject(columnIndex + 1, 
LocalDateTime.class);
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return resultSet.getObject(columnIndex + 1);
+            default:
+                throw new IllegalArgumentException("Unsupported column type: " 
+ type.getType());
+        }
+    }
+
+    @Override
+    protected ColumnValueConverter getOutputConverter(ColumnType columnType, 
String replaceString) {
+        switch (columnType.getType()) {
+            case CHAR:
+                return createConverter(
+                        input -> trimSpaces(input.toString()), String.class);
+            case LARGEINT:
+                return createConverter(
+                        input -> ((BigDecimal) input).toBigInteger(), 
BigInteger.class);
+            case STRING:
+                return createConverter(input -> {
+                    if (input instanceof Clob) {
+                        try {
+                            return ((Clob) input).getSubString(1, (int) 
((Clob) input).length());
+                        } catch (SQLException e) {
+                            LOG.error("Failed to get string from clob", e);
+                            return null;
+                        }
+                    } else {
+                        return input.toString();
+                    }
+                }, String.class);
+            default:
+                return null;
+        }
+    }
+
+    private String trimSpaces(String str) {
+        int end = str.length() - 1;
+        while (end >= 0 && str.charAt(end) == ' ') {
+            end--;
+        }
+        return str.substring(0, end + 1);
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 2a636532069..3520a11d8bc 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -225,11 +225,13 @@ views
 
 -- !dt_null --
 \N
+0000-01-01T00:00
 2023-06-17T10:00
 
 -- !test_dz --
 1      \N
 2      2022-01-01
+3      0000-01-01
 
 -- !test_filter_not --
 张三1    11      12345678        123     321312  1999-02-13T00:00        中国      
男       0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to