This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3c37fb085cec1386c30ee78ba24bea46c027b829 Author: zy-kkk <[email protected]> AuthorDate: Wed Feb 28 22:24:18 2024 +0800 [refactor](jdbc catalog) split jdbc executor for different data sources (step-1) (#31406) --- be/src/vec/exec/vjdbc_connector.cpp | 35 +- be/src/vec/exec/vjdbc_connector.h | 3 + .../docker-compose/mysql/init/04-insert.sql | 4 +- .../org/apache/doris/jdbc/BaseJdbcExecutor.java | 520 ++++++++++ ...{JdbcExecutor.java => DefaultJdbcExecutor.java} | 6 +- .../java/org/apache/doris/jdbc/JdbcExecutor.java | 1085 +------------------- .../org/apache/doris/jdbc/JdbcExecutorFactory.java | 33 + .../org/apache/doris/jdbc/MySQLJdbcExecutor.java | 291 ++++++ .../org/apache/doris/jdbc/OracleJdbcExecutor.java | 131 +++ .../jdbc/test_mysql_jdbc_catalog.out | 2 + 10 files changed, 1028 insertions(+), 1082 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index e6419ec95e7..4020278e84d 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -49,7 +49,7 @@ #include "vec/io/reader_buffer.h" namespace doris::vectorized { -const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/jdbc/JdbcExecutor"; +const char* JDBC_EXECUTOR_FACTORY_CLASS = "org/apache/doris/jdbc/JdbcExecutorFactory"; const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; @@ -85,6 +85,7 @@ Status JdbcConnector::close(Status /*unused*/) { } JNIEnv* env; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->DeleteGlobalRef(_executor_factory_clazz); env->DeleteGlobalRef(_executor_clazz); DELETE_BASIC_JAVA_CLAZZ_REF(object) DELETE_BASIC_JAVA_CLAZZ_REF(string) @@ -104,7 +105,29 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_CLASS, &_executor_clazz)); + RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, JDBC_EXECUTOR_FACTORY_CLASS, + &_executor_factory_clazz)); + + _executor_factory_ctor_id = + env->GetStaticMethodID(_executor_factory_clazz, "getExecutorClass", + "(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;"); + if (_executor_factory_ctor_id == nullptr) { + return Status::InternalError("Failed to find method ID for getExecutorClass"); + } + + jobject jtable_type = _get_java_table_type(env, _conn_param.table_type); + + jstring executor_name = (jstring)env->CallStaticObjectMethod( + _executor_factory_clazz, _executor_factory_ctor_id, jtable_type); + if (executor_name == nullptr) { + return Status::InternalError("getExecutorClass returned null"); + } + const char* executor_name_str = env->GetStringUTFChars(executor_name, nullptr); + + RETURN_IF_ERROR(JniUtil::get_jni_scanner_class(env, executor_name_str, &_executor_clazz)); + env->DeleteLocalRef(jtable_type); + env->ReleaseStringUTFChars(executor_name, executor_name_str); + env->DeleteLocalRef(executor_name); GET_BASIC_JAVA_CLAZZ("java/util/List", list) GET_BASIC_JAVA_CLAZZ("java/lang/Object", object) GET_BASIC_JAVA_CLAZZ("java/lang/String", string) @@ -747,4 +770,12 @@ Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Bloc return Status::OK(); } +jobject JdbcConnector::_get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType) { + jclass enumClass = env->FindClass("org/apache/doris/thrift/TOdbcTableType"); + jmethodID findByValueMethod = env->GetStaticMethodID( + enumClass, "findByValue", "(I)Lorg/apache/doris/thrift/TOdbcTableType;"); + jobject javaEnumObj = + env->CallStaticObjectMethod(enumClass, findByValueMethod, static_cast<jint>(tableType)); + return javaEnumObj; +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 2ecdf210fac..ed2afdecdfd 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -131,13 +131,16 @@ private: int rows); Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); + jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType); bool _closed = false; + jclass _executor_factory_clazz; jclass _executor_clazz; jclass _executor_list_clazz; jclass _executor_object_clazz; jclass _executor_string_clazz; jobject _executor_obj; + jmethodID _executor_factory_ctor_id; jmethodID _executor_ctor_id; jmethodID _executor_stmt_write_id; jmethodID _executor_read_id; diff --git a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql index bb1b0962b45..d71986b1354 100644 --- a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql +++ b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql @@ -1151,7 +1151,7 @@ VALUES ('2023-06-17 10:00:00', '2023-06-17 10:00:01.1', '2023-06-17 10:00:02.22' SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'STRICT_TRANS_TABLES','')); INSERT INTO doris_test.dt_null -VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00'); +VALUES ('2023-06-17 10:00:00'),('0000-00-00 00:00:00'),('0000-01-01 00:00:00'); insert into doris_test.test_key_word values (1, 1), (2, 2); @@ -1159,7 +1159,7 @@ insert into doris_test.test_key_word values (1, 1), (2, 2); SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_DATE','')); SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_IN_DATE','')); -insert into doris_test.test_zd (id,d_z) VALUES (1,'0000-00-00'),(2,'2022-01-01'); +insert into doris_test.test_zd (id,d_z) VALUES (1,'0000-00-00'),(2,'2022-01-01'),(3,'0000-01-01'); insert into Doris.DORIS values ('DORIS'); insert into Doris.Doris values ('Doris'); diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java new file mode 100644 index 00000000000..87f67807826 --- /dev/null +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jdbc; + +import org.apache.doris.common.exception.InternalException; +import org.apache.doris.common.exception.UdfRuntimeException; +import org.apache.doris.common.jni.utils.UdfUtils; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnValueConverter; +import org.apache.doris.common.jni.vec.VectorColumn; +import org.apache.doris.common.jni.vec.VectorTable; +import org.apache.doris.thrift.TJdbcExecutorCtorParams; +import org.apache.doris.thrift.TJdbcOperation; +import org.apache.doris.thrift.TOdbcTableType; + +import com.alibaba.druid.pool.DruidDataSource; +import com.google.common.base.Preconditions; +import org.apache.log4j.Logger; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; + +import java.io.FileNotFoundException; +import java.lang.reflect.Array; +import java.net.MalformedURLException; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public abstract class BaseJdbcExecutor implements JdbcExecutor { + private static final Logger LOG = Logger.getLogger(BaseJdbcExecutor.class); + private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); + private DruidDataSource druidDataSource = null; + private final byte[] druidDataSourceLock = new byte[0]; + private TOdbcTableType tableType; + private JdbcDataSourceConfig config; + private Connection conn = null; + protected PreparedStatement preparedStatement = null; + protected Statement stmt = null; + protected ResultSet resultSet = null; + protected ResultSetMetaData resultSetMetaData = null; + protected List<String> resultColumnTypeNames = null; + protected List<Object[]> block = null; + protected VectorTable outputTable = null; + protected int batchSizeNum = 0; + protected int curBlockRows = 0; + + public BaseJdbcExecutor(byte[] thriftParams) throws Exception { + TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); + TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); + try { + deserializer.deserialize(request, thriftParams); + } catch (TException e) { + throw new InternalException(e.getMessage()); + } + tableType = request.table_type; + this.config = new JdbcDataSourceConfig() + .setCatalogId(request.catalog_id) + .setJdbcUser(request.jdbc_user) + .setJdbcPassword(request.jdbc_password) + .setJdbcUrl(request.jdbc_url) + .setJdbcDriverUrl(request.driver_path) + .setJdbcDriverClass(request.jdbc_driver_class) + .setBatchSize(request.batch_size) + .setOp(request.op) + .setTableType(request.table_type) + .setConnectionPoolMinSize(request.connection_pool_min_size) + .setConnectionPoolMaxSize(request.connection_pool_max_size) + .setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time) + .setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time) + .setConnectionPoolKeepAlive(request.connection_pool_keep_alive); + JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time); + init(config, request.statement); + } + + public void close() throws Exception { + try { + if (stmt != null) { + try { + stmt.cancel(); + } catch (SQLException e) { + LOG.error("Error cancelling statement", e); + } + } + + boolean shouldAbort = conn != null && resultSet != null; + boolean aborted = false; // Used to record whether the abort operation is performed + if (shouldAbort) { + aborted = abortReadConnection(conn, resultSet); + } + + // If no abort operation is performed, the resource needs to be closed manually + if (!aborted) { + closeResources(resultSet, stmt, conn); + } + } finally { + if (config.getConnectionPoolMinSize() == 0 && druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + } + + private void closeResources(AutoCloseable... closeables) { + for (AutoCloseable closeable : closeables) { + if (closeable != null) { + try { + if (closeable instanceof Connection) { + if (!((Connection) closeable).isClosed()) { + closeable.close(); + } + } else { + closeable.close(); + } + } catch (Exception e) { + LOG.error("Cannot close resource: ", e); + } + } + } + } + + protected boolean abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException { + return false; + } + + public int read() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + resultSetMetaData = resultSet.getMetaData(); + int columnCount = resultSetMetaData.getColumnCount(); + resultColumnTypeNames = new ArrayList<>(columnCount); + block = new ArrayList<>(columnCount); + for (int i = 0; i < columnCount; ++i) { + resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); + } + return columnCount; + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } + } + + public long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException { + try { + if (outputTable != null) { + outputTable.close(); + } + + outputTable = VectorTable.createWritableTable(outputParams, 0); + + String isNullableString = outputParams.get("is_nullable"); + String replaceString = outputParams.get("replace_string"); + + if (isNullableString == null || replaceString == null) { + throw new IllegalArgumentException( + "Output parameters 'is_nullable' and 'replace_string' are required."); + } + + String[] nullableList = isNullableString.split(","); + String[] replaceStringList = replaceString.split(","); + curBlockRows = 0; + int columnCount = resultSetMetaData.getColumnCount(); + + initializeBlock(columnCount, replaceStringList, batchSize, outputTable); + + do { + for (int i = 0; i < columnCount; ++i) { + ColumnType type = outputTable.getColumnType(i); + block.get(i)[curBlockRows] = getColumnValue(i, type, replaceStringList); + } + curBlockRows++; + } while (curBlockRows < batchSize && resultSet.next()); + + for (int i = 0; i < columnCount; ++i) { + ColumnType type = outputTable.getColumnType(i); + Object[] columnData = block.get(i); + Class<?> componentType = columnData.getClass().getComponentType(); + Object[] newColumn = (Object[]) Array.newInstance(componentType, curBlockRows); + System.arraycopy(columnData, 0, newColumn, 0, curBlockRows); + boolean isNullable = Boolean.parseBoolean(nullableList[i]); + outputTable.appendData(i, newColumn, getOutputConverter(type, replaceStringList[i]), isNullable); + } + } catch (Exception e) { + LOG.warn("jdbc get block address exception: ", e); + throw new UdfRuntimeException("jdbc get block address: ", e); + } + return outputTable.getMetaAddress(); + } + + protected void initializeBlock(int columnCount, String[] replaceStringList, int batchSizeNum, + VectorTable outputTable) { + for (int i = 0; i < columnCount; ++i) { + block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum)); + } + } + + public int write(Map<String, String> params) throws UdfRuntimeException { + VectorTable batchTable = VectorTable.createReadableTable(params); + // Can't release or close batchTable, it's released by c++ + try { + insert(batchTable); + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } + return batchTable.getNumRows(); + } + + public void openTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.setAutoCommit(false); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor open transaction has error: ", e); + } + } + + public void commitTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.commit(); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e); + } + } + + public void rollbackTrans() throws UdfRuntimeException { + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e); + } + } + + public List<String> getResultColumnTypeNames() { + return resultColumnTypeNames; + } + + public int getCurBlockRows() { + return curBlockRows; + } + + public boolean hasNext() throws UdfRuntimeException { + try { + if (resultSet == null) { + return false; + } + return resultSet.next(); + } catch (SQLException e) { + throw new UdfRuntimeException("resultSet to get next error: ", e); + } + } + + private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException { + String druidDataSourceKey = config.createCacheKey(); + try { + ClassLoader parent = getClass().getClassLoader(); + ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent); + druidDataSource = JdbcDataSource.getDataSource().getSource(druidDataSourceKey); + if (druidDataSource == null) { + synchronized (druidDataSourceLock) { + druidDataSource = JdbcDataSource.getDataSource().getSource(druidDataSourceKey); + if (druidDataSource == null) { + long start = System.currentTimeMillis(); + DruidDataSource ds = new DruidDataSource(); + ds.setDriverClassLoader(classLoader); + ds.setDriverClassName(config.getJdbcDriverClass()); + ds.setUrl(config.getJdbcUrl()); + ds.setUsername(config.getJdbcUser()); + ds.setPassword(config.getJdbcPassword()); + ds.setMinIdle(config.getConnectionPoolMinSize()); // default 1 + ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1 + ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10 + ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000 + ds.setTestWhileIdle(true); + ds.setTestOnBorrow(false); + setValidationQuery(ds); + // default 3 min + ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 10L); + // default 15 min + ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L); + // default 30 min + ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime()); + ds.setKeepAlive(config.isConnectionPoolKeepAlive()); + // default 6 min + ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L); + druidDataSource = ds; + JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds); + LOG.info("JdbcClient set" + + " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize() + + ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize() + + ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime() + + ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime() + + ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive()); + LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( + System.currentTimeMillis() - start) + " ms"); + } + } + } + + long start = System.currentTimeMillis(); + conn = druidDataSource.getConnection(); + LOG.info("get connection [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( + System.currentTimeMillis() - start) + + " ms"); + + initializeStatement(conn, config, sql); + + } catch (MalformedURLException e) { + throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e); + } catch (SQLException e) { + throw new UdfRuntimeException("Initialize datasource failed: ", e); + } catch (FileNotFoundException e) { + throw new UdfRuntimeException("FileNotFoundException failed: ", e); + } catch (Exception e) { + throw new UdfRuntimeException("Initialize datasource failed: ", e); + } + } + + protected void setValidationQuery(DruidDataSource ds) { + ds.setValidationQuery("SELECT 1"); + } + + protected void initializeStatement(Connection conn, JdbcDataSourceConfig config, String sql) throws SQLException { + if (config.getOp() == TJdbcOperation.READ) { + conn.setAutoCommit(false); + Preconditions.checkArgument(sql != null, "SQL statement cannot be null for READ operation."); + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt.setFetchSize(config.getBatchSize()); // set fetch size to batch size + batchSizeNum = config.getBatchSize(); + } else { + Preconditions.checkArgument(sql != null, "SQL statement cannot be null for WRITE operation."); + LOG.info("Insert SQL: " + sql); + preparedStatement = conn.prepareStatement(sql); + } + } + + protected abstract Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) + throws SQLException; + + /* + | Type | Java Array Type | + |---------------------------------------------|----------------------------| + | BOOLEAN | Boolean[] | + | TINYINT | Byte[] | + | SMALLINT | Short[] | + | INT | Integer[] | + | BIGINT | Long[] | + | LARGEINT | BigInteger[] | + | FLOAT | Float[] | + | DOUBLE | Double[] | + | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[] | + | DATE, DATEV2 | LocalDate[] | + | DATETIME, DATETIMEV2 | LocalDateTime[] | + | CHAR, VARCHAR, STRING | String[] | + | ARRAY | List<Object>[] | + | MAP | Map<Object, Object>[] | + | STRUCT | Map<String, Object>[] | + */ + + protected abstract ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString); + + protected ColumnValueConverter createConverter( + Function<Object, ?> converterFunction, Class<?> type) { + return (Object[] columnData) -> { + Object[] result = (Object[]) Array.newInstance(type, columnData.length); + for (int i = 0; i < columnData.length; i++) { + result[i] = columnData[i] != null ? converterFunction.apply(columnData[i]) : null; + } + return result; + }; + } + + private int insert(VectorTable data) throws SQLException { + for (int i = 0; i < data.getNumRows(); ++i) { + for (int j = 0; j < data.getColumns().length; ++j) { + insertColumn(i, j, data.getColumns()[j]); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + return data.getNumRows(); + } + + private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { + int parameterIndex = colIdx + 1; + ColumnType.Type dorisType = column.getColumnTyp(); + if (column.isNullAt(rowIdx)) { + insertNullColumn(parameterIndex, dorisType); + return; + } + switch (dorisType) { + case BOOLEAN: + preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); + break; + case TINYINT: + preparedStatement.setByte(parameterIndex, column.getByte(rowIdx)); + break; + case SMALLINT: + preparedStatement.setShort(parameterIndex, column.getShort(rowIdx)); + break; + case INT: + preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); + break; + case BIGINT: + preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); + break; + case LARGEINT: + preparedStatement.setObject(parameterIndex, column.getBigInteger(rowIdx)); + break; + case FLOAT: + preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); + break; + case DOUBLE: + preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); + break; + case DATEV2: + preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); + break; + case DATETIMEV2: + preparedStatement.setTimestamp( + parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); + } + } + + private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) + throws SQLException { + switch (dorisType) { + case BOOLEAN: + preparedStatement.setNull(parameterIndex, Types.BOOLEAN); + break; + case TINYINT: + preparedStatement.setNull(parameterIndex, Types.TINYINT); + break; + case SMALLINT: + preparedStatement.setNull(parameterIndex, Types.SMALLINT); + break; + case INT: + preparedStatement.setNull(parameterIndex, Types.INTEGER); + break; + case BIGINT: + preparedStatement.setNull(parameterIndex, Types.BIGINT); + break; + case LARGEINT: + preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT); + break; + case FLOAT: + preparedStatement.setNull(parameterIndex, Types.FLOAT); + break; + case DOUBLE: + preparedStatement.setNull(parameterIndex, Types.DOUBLE); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setNull(parameterIndex, Types.DECIMAL); + break; + case DATEV2: + preparedStatement.setNull(parameterIndex, Types.DATE); + break; + case DATETIMEV2: + preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setNull(parameterIndex, Types.VARCHAR); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); + } + } +} diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java similarity index 99% copy from fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java copy to fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java index 3f00643d126..94d4304db38 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java @@ -78,8 +78,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -public class JdbcExecutor { - private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); +public class DefaultJdbcExecutor { + private static final Logger LOG = Logger.getLogger(DefaultJdbcExecutor.class); private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); private Connection conn = null; private PreparedStatement preparedStatement = null; @@ -97,7 +97,7 @@ public class JdbcExecutor { private TOdbcTableType tableType; private JdbcDataSourceConfig config; - public JdbcExecutor(byte[] thriftParams) throws Exception { + public DefaultJdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); try { diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java index 3f00643d126..85c40e21d16 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java @@ -17,1091 +17,26 @@ package org.apache.doris.jdbc; -import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; -import org.apache.doris.common.jni.utils.UdfUtils; -import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ColumnValueConverter; -import org.apache.doris.common.jni.vec.VectorColumn; -import org.apache.doris.common.jni.vec.VectorTable; -import org.apache.doris.thrift.TJdbcExecutorCtorParams; -import org.apache.doris.thrift.TJdbcOperation; -import org.apache.doris.thrift.TOdbcTableType; -import com.alibaba.druid.pool.DruidDataSource; -import com.clickhouse.data.value.UnsignedByte; -import com.clickhouse.data.value.UnsignedInteger; -import com.clickhouse.data.value.UnsignedLong; -import com.clickhouse.data.value.UnsignedShort; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import com.vesoft.nebula.client.graph.data.ValueWrapper; -import org.apache.log4j.Logger; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; - -import java.io.FileNotFoundException; -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.net.Inet4Address; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.MalformedURLException; -import java.sql.Connection; -import java.sql.Date; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class JdbcExecutor { - private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); - private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); - private Connection conn = null; - private PreparedStatement preparedStatement = null; - private Statement stmt = null; - private ResultSet resultSet = null; - private ResultSetMetaData resultSetMetaData = null; - private List<String> resultColumnTypeNames = null; - private List<Object[]> block = null; - private VectorTable outputTable = null; - private int batchSizeNum = 0; - private int curBlockRows = 0; - private static final byte[] emptyBytes = new byte[0]; - private DruidDataSource druidDataSource = null; - private final byte[] druidDataSourceLock = new byte[0]; - private TOdbcTableType tableType; - private JdbcDataSourceConfig config; - - public JdbcExecutor(byte[] thriftParams) throws Exception { - TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); - TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); - try { - deserializer.deserialize(request, thriftParams); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - tableType = request.table_type; - this.config = new JdbcDataSourceConfig() - .setCatalogId(request.catalog_id) - .setJdbcUser(request.jdbc_user) - .setJdbcPassword(request.jdbc_password) - .setJdbcUrl(request.jdbc_url) - .setJdbcDriverUrl(request.driver_path) - .setJdbcDriverClass(request.jdbc_driver_class) - .setBatchSize(request.batch_size) - .setOp(request.op) - .setTableType(request.table_type) - .setConnectionPoolMinSize(request.connection_pool_min_size) - .setConnectionPoolMaxSize(request.connection_pool_max_size) - .setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time) - .setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time) - .setConnectionPoolKeepAlive(request.connection_pool_keep_alive); - JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time); - init(config, request.statement); - } - - public void close() throws Exception { - try { - if (stmt != null) { - try { - stmt.cancel(); - } catch (SQLException e) { - LOG.error("Error cancelling statement", e); - } - } - - boolean shouldAbort = conn != null && resultSet != null - && (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.SQLSERVER); - boolean aborted = false; // Used to record whether the abort operation is performed - if (shouldAbort) { - aborted = abortReadConnection(conn, resultSet, tableType); - } - - // If no abort operation is performed, the resource needs to be closed manually - if (!aborted) { - closeResources(resultSet, stmt, conn); - } - } finally { - if (config.getConnectionPoolMinSize() == 0 && druidDataSource != null) { - druidDataSource.close(); - JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); - druidDataSource = null; - } - } - } - - private void closeResources(AutoCloseable... closeables) { - for (AutoCloseable closeable : closeables) { - if (closeable != null) { - try { - if (closeable instanceof Connection) { - if (!((Connection) closeable).isClosed()) { - closeable.close(); - } - } else { - closeable.close(); - } - } catch (Exception e) { - LOG.error("Cannot close resource: ", e); - } - } - } - } - - public boolean abortReadConnection(Connection connection, ResultSet resultSet, TOdbcTableType tableType) - throws SQLException { - if (!resultSet.isAfterLast() && (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.SQLSERVER)) { - // Abort connection before closing. Without this, the MySQL/SQLServer driver - // attempts to drain the connection by reading all the results. - connection.abort(MoreExecutors.directExecutor()); - return true; - } - return false; - } - - public int read() throws UdfRuntimeException { - try { - resultSet = ((PreparedStatement) stmt).executeQuery(); - resultSetMetaData = resultSet.getMetaData(); - int columnCount = resultSetMetaData.getColumnCount(); - resultColumnTypeNames = new ArrayList<>(columnCount); - block = new ArrayList<>(columnCount); - if (isNebula()) { - for (int i = 0; i < columnCount; ++i) { - block.add((Object[]) Array.newInstance(Object.class, batchSizeNum)); - } - } else { - for (int i = 0; i < columnCount; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - block.add((Object[]) Array.newInstance(Object.class, batchSizeNum)); - } - } - return columnCount; - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor sql has error: ", e); - } - } - - public long getBlockAddress(int batchSize, Map<String, String> outputParams) - throws UdfRuntimeException { - try { - if (outputTable != null) { - outputTable.close(); - } - - String isNullableString = outputParams.get("is_nullable"); - String replaceString = outputParams.get("replace_string"); - - if (isNullableString == null || replaceString == null) { - throw new IllegalArgumentException( - "Output parameters 'is_nullable' and 'replace_string' are required."); - } - - String[] nullableList = isNullableString.split(","); - String[] replaceStringList = replaceString.split(","); - curBlockRows = 0; - int columnCount = resultSetMetaData.getColumnCount(); - - do { - for (int i = 0; i < columnCount; ++i) { - boolean isBitmapOrHll = - replaceStringList[i].equals("bitmap") - || replaceStringList[i].equals("hll"); - block.get(i)[curBlockRows] = getColumnValue(tableType, i, isBitmapOrHll); - } - curBlockRows++; - } while (curBlockRows < batchSize && resultSet.next()); - - outputTable = VectorTable.createWritableTable(outputParams, curBlockRows); - - for (int i = 0; i < columnCount; ++i) { - Object[] columnData = block.get(i); - ColumnType type = outputTable.getColumnType(i); - Class<?> clz = findNonNullClass(columnData, type); - Object[] newColumn = (Object[]) Array.newInstance(clz, curBlockRows); - System.arraycopy(columnData, 0, newColumn, 0, curBlockRows); - boolean isNullable = Boolean.parseBoolean(nullableList[i]); - outputTable.appendData( - i, - newColumn, - getOutputConverter(type, clz, replaceStringList[i]), - isNullable); - } - } catch (Exception e) { - LOG.warn("jdbc get block address exception: ", e); - throw new UdfRuntimeException("jdbc get block address: ", e); - } - return outputTable.getMetaAddress(); - } - - public int write(Map<String, String> params) throws UdfRuntimeException { - VectorTable batchTable = VectorTable.createReadableTable(params); - // Can't release or close batchTable, it's released by c++ - try { - insert(batchTable); - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor sql has error: ", e); - } - return batchTable.getNumRows(); - } - - public void openTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.setAutoCommit(false); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor open transaction has error: ", e); - } - } - - public void commitTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.commit(); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e); - } - } - - public void rollbackTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.rollback(); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e); - } - } - - public List<String> getResultColumnTypeNames() { - return resultColumnTypeNames; - } - - public int getCurBlockRows() { - return curBlockRows; - } - - public boolean hasNext() throws UdfRuntimeException { - try { - if (resultSet == null) { - return false; - } - return resultSet.next(); - } catch (SQLException e) { - throw new UdfRuntimeException("resultSet to get next error: ", e); - } - } - - private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException { - String druidDataSourceKey = config.createCacheKey(); - try { - if (isNebula()) { - batchSizeNum = config.getBatchSize(); - Class.forName(config.getJdbcDriverClass()); - conn = DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(), - config.getJdbcPassword()); - stmt = conn.prepareStatement(sql); - } else { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent); - druidDataSource = JdbcDataSource.getDataSource().getSource(druidDataSourceKey); - if (druidDataSource == null) { - synchronized (druidDataSourceLock) { - druidDataSource = JdbcDataSource.getDataSource().getSource(druidDataSourceKey); - if (druidDataSource == null) { - long start = System.currentTimeMillis(); - DruidDataSource ds = new DruidDataSource(); - ds.setDriverClassLoader(classLoader); - ds.setDriverClassName(config.getJdbcDriverClass()); - ds.setUrl(config.getJdbcUrl()); - ds.setUsername(config.getJdbcUser()); - ds.setPassword(config.getJdbcPassword()); - ds.setMinIdle(config.getConnectionPoolMinSize()); // default 1 - ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1 - ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10 - ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000 - ds.setTestWhileIdle(true); - ds.setTestOnBorrow(false); - setValidationQuery(ds, config.getTableType()); - // default 3 min - ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 10L); - // default 15 min - ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L); - // default 30 min - ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime()); - ds.setKeepAlive(config.isConnectionPoolKeepAlive()); - // default 6 min - ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L); - druidDataSource = ds; - JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds); - LOG.info("JdbcClient set" - + " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize() - + ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize() - + ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime() - + ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime() - + ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive()); - LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( - System.currentTimeMillis() - start) + " ms"); - } - } - } - - long start = System.currentTimeMillis(); - conn = druidDataSource.getConnection(); - LOG.info("get connection [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( - System.currentTimeMillis() - start) - + " ms"); - if (config.getOp() == TJdbcOperation.READ) { - conn.setAutoCommit(false); - Preconditions.checkArgument(sql != null); - stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - if (tableType == TOdbcTableType.MYSQL) { - stmt.setFetchSize(Integer.MIN_VALUE); - } else { - stmt.setFetchSize(config.getBatchSize()); - } - batchSizeNum = config.getBatchSize(); - } else { - LOG.info("insert sql: " + sql); - preparedStatement = conn.prepareStatement(sql); - } - } - } catch (MalformedURLException e) { - throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e); - } catch (SQLException e) { - throw new UdfRuntimeException("Initialize datasource failed: ", e); - } catch (FileNotFoundException e) { - throw new UdfRuntimeException("FileNotFoundException failed: ", e); - } catch (Exception e) { - throw new UdfRuntimeException("Initialize datasource failed: ", e); - } - } - - private void setValidationQuery(DruidDataSource ds, TOdbcTableType tableType) { - if (tableType == TOdbcTableType.ORACLE || tableType == TOdbcTableType.OCEANBASE_ORACLE) { - ds.setValidationQuery("SELECT 1 FROM dual"); - } else if (tableType == TOdbcTableType.SAP_HANA) { - ds.setValidationQuery("SELECT 1 FROM DUMMY"); - } else { - ds.setValidationQuery("SELECT 1"); - } - } - - public boolean isNebula() { - return tableType == TOdbcTableType.NEBULA; - } - - private Class<?> findNonNullClass(Object[] columnData, ColumnType type) { - for (Object data : columnData) { - if (data != null) { - return data.getClass(); - } - } - switch (type.getType()) { - case BOOLEAN: - return Boolean.class; - case TINYINT: - return Byte.class; - case SMALLINT: - return Short.class; - case INT: - return Integer.class; - case BIGINT: - return Long.class; - case LARGEINT: - return BigInteger.class; - case FLOAT: - return Float.class; - case DOUBLE: - return Double.class; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - return BigDecimal.class; - case DATE: - case DATEV2: - return LocalDate.class; - case DATETIME: - case DATETIMEV2: - return LocalDateTime.class; - case CHAR: - case VARCHAR: - case STRING: - return String.class; - case ARRAY: - return List.class; - default: - throw new IllegalArgumentException( - "Unsupported column type: " + type.getType()); - } - } - - public Object getColumnValue(TOdbcTableType tableType, int columnIndex, boolean isBitmapOrHll) - throws SQLException { - Object result; - if (tableType == TOdbcTableType.NEBULA) { - result = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(columnIndex + 1)); - } else { - result = - isBitmapOrHll - ? resultSet.getBytes(columnIndex + 1) - : resultSet.getObject(columnIndex + 1); - } - return result; - } - - /* - | Type | Java Array Type | - |---------------------------------------------|----------------------------| - | BOOLEAN | Boolean[] | - | TINYINT | Byte[] | - | SMALLINT | Short[] | - | INT | Integer[] | - | BIGINT | Long[] | - | LARGEINT | BigInteger[] | - | FLOAT | Float[] | - | DOUBLE | Double[] | - | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[] | - | DATE, DATEV2 | LocalDate[] | - | DATETIME, DATETIMEV2 | LocalDateTime[] | - | CHAR, VARCHAR, STRING | String[] | - | ARRAY | List<Object>[] | - | MAP | Map<Object, Object>[] | - | STRUCT | Map<String, Object>[] | - */ - - private ColumnValueConverter getOutputConverter( - ColumnType columnType, Class clz, String replaceString) { - switch (columnType.getType()) { - case BOOLEAN: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input) != 0, Boolean.class); - } - if (Byte.class.equals(clz)) { - return createConverter(input -> ((Byte) input) != 0, Boolean.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> - Boolean.parseBoolean( - String.valueOf(input).equals("1") ? "true" : "false"), - Boolean.class); - } - break; - case TINYINT: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).byteValue(), Byte.class); - } - if (Short.class.equals(clz)) { - return createConverter(input -> ((Short) input).byteValue(), Byte.class); - } - if (Object.class.equals(clz)) { - return createConverter( - input -> (byte) Integer.parseInt(String.valueOf(input)), Byte.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).byteValue(), Byte.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Byte.parseByte(String.valueOf(input)), Byte.class); - } - break; - case SMALLINT: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).shortValue(), Short.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).shortValue(), Short.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Short.parseShort(String.valueOf(input)), Short.class); - } - if (Byte.class.equals(clz)) { - return createConverter(input -> ((Byte) input).shortValue(), Short.class); - } - if (com.clickhouse.data.value.UnsignedByte.class.equals(clz)) { - return createConverter( - input -> ((UnsignedByte) input).shortValue(), Short.class); - } - break; - case INT: - if (Long.class.equals(clz)) { - return createConverter(input -> ((Long) input).intValue(), Integer.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).intValue(), Integer.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Integer.parseInt(String.valueOf(input)), Integer.class); - } - if (Short.class.equals(clz)) { - return createConverter(input -> ((Short) input).intValue(), Integer.class); - } - if (com.clickhouse.data.value.UnsignedShort.class.equals(clz)) { - return createConverter( - input -> ((UnsignedShort) input).intValue(), Integer.class); - } - break; - case BIGINT: - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).longValue(), Long.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Long.parseLong(String.valueOf(input)), Long.class); - } - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).longValue(), Long.class); - } - if (com.clickhouse.data.value.UnsignedInteger.class.equals(clz)) { - return createConverter( - input -> ((UnsignedInteger) input).longValue(), Long.class); - } - break; - case LARGEINT: - if (BigDecimal.class.equals(clz)) { - return createConverter( - input -> ((BigDecimal) input).toBigInteger(), BigInteger.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> new BigInteger(String.valueOf(input)), BigInteger.class); - } - if (Long.class.equals(clz)) { - return createConverter( - input -> BigInteger.valueOf((Long) input), BigInteger.class); - } - if (com.clickhouse.data.value.UnsignedLong.class.equals(clz)) { - return createConverter( - input -> ((UnsignedLong) input).bigIntegerValue(), BigInteger.class); - } - break; - case DOUBLE: - if (BigDecimal.class.equals(clz)) { - return createConverter( - input -> ((BigDecimal) input).doubleValue(), Double.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Double.parseDouble(String.valueOf(input)), Double.class); - } - break; - case FLOAT: - return createConverter( - input -> Float.parseFloat(String.valueOf(input)), Float.class); - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - return createConverter( - input -> new BigDecimal(String.valueOf(input)), BigDecimal.class); - case DATE: - case DATEV2: - if (Date.class.equals(clz)) { - return createConverter(input -> ((Date) input).toLocalDate(), LocalDate.class); - } - if (Timestamp.class.equals(clz)) { - return createConverter( - input -> ((Timestamp) input).toLocalDateTime().toLocalDate(), - LocalDate.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> LocalDate.parse(String.valueOf(input)), LocalDate.class); - } - break; - case DATETIME: - case DATETIMEV2: - if (Timestamp.class.equals(clz)) { - return createConverter( - input -> ((Timestamp) input).toLocalDateTime(), LocalDateTime.class); - } - if (OffsetDateTime.class.equals(clz)) { - return createConverter( - input -> ((OffsetDateTime) input).toLocalDateTime(), - LocalDateTime.class); - } - if (oracle.sql.TIMESTAMP.class.equals(clz)) { - return createConverter( - input -> { - try { - return ((oracle.sql.TIMESTAMP) input) - .timestampValue() - .toLocalDateTime(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - LocalDateTime.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> - LocalDateTime.parse( - String.valueOf(input), - getDateTimeFormatter(String.valueOf(input))), - LocalDateTime.class); - } - break; - case CHAR: - return createConverter( - input -> trimSpaces(tableType, input.toString()), String.class); - case VARCHAR: - case STRING: - if (byte[].class.equals(clz)) { - if (replaceString.equals("bitmap") || replaceString.equals("hll")) { - break; - } else { - return createConverter( - input -> byteArrayToHexString(tableType, (byte[]) input), - String.class); - } - } - if (Time.class.equals(clz)) { - return createConverter( - input -> timeToString((java.sql.Time) input), String.class); - } - if (oracle.sql.CLOB.class.equals(clz)) { - return createConverter( - input -> { - try { - oracle.sql.CLOB clob = (oracle.sql.CLOB) input; - return clob.getSubString(1, (int) clob.length()); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - String.class); - } - if (java.net.Inet4Address.class.equals(clz)) { - return createConverter( - input -> ((InetAddress) input).getHostAddress(), String.class); - } - if (java.net.Inet6Address.class.equals(clz)) { - return createConverter( - input -> { - String inetAddress = ((InetAddress) input).getHostAddress(); - return simplifyIPv6Address(inetAddress); - }, - String.class); - } else { - return createConverter(Object::toString, String.class); - } - case ARRAY: - if (java.sql.Array.class.equals(clz)) { - return createConverter( - input -> { - try { - return Arrays.asList( - (Object[]) ((java.sql.Array) input).getArray()); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - List.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> { - List<Object> list = parseArray(String.valueOf(input)); - return convertArray(list, columnType.getChildTypes().get(0)); - }, - List.class); - } - if (tableType == TOdbcTableType.CLICKHOUSE) { - return createConverter( - input -> { - List<Object> list = convertClickHouseArray(input); - return convertArray(list, columnType.getChildTypes().get(0)); - }, - List.class); - } - break; - default: - throw new IllegalArgumentException( - "Unsupported column type: " + columnType.getType()); - } - return null; - } - - private ColumnValueConverter createConverter( - Function<Object, ?> converterFunction, Class<?> type) { - return (Object[] columnData) -> { - Object[] result = (Object[]) Array.newInstance(type, columnData.length); - for (int i = 0; i < columnData.length; i++) { - result[i] = columnData[i] != null ? converterFunction.apply(columnData[i]) : null; - } - return result; - }; - } - - private String byteArrayToHexString(TOdbcTableType tableType, byte[] columnData) { - if (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.OCEANBASE) { - return mysqlByteArrayToHexString(columnData); - } else if (tableType == TOdbcTableType.POSTGRESQL) { - return pgByteArrayToHexString(columnData); - } else { - return defaultByteArrayToHexString(columnData); - } - } - - private String mysqlByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("0x"); - for (byte b : bytes) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) { - hexString.append('0'); - } - hexString.append(hex.toUpperCase()); - } - return hexString.toString(); - } - - private static String pgByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("\\x"); - for (byte b : bytes) { - hexString.append(String.format("%02x", b & 0xff)); - } - return hexString.toString(); - } - - private String defaultByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder(); - for (byte b : bytes) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) { - hexString.append('0'); - } - hexString.append(hex.toUpperCase()); - } - return hexString.toString(); - } - - private String trimSpaces(TOdbcTableType tableType, String str) { - if (tableType == TOdbcTableType.POSTGRESQL || tableType == TOdbcTableType.ORACLE) { - int end = str.length() - 1; - while (end >= 0 && str.charAt(end) == ' ') { - end--; - } - return str.substring(0, end + 1); - } else { - return str; - } - } - - public String timeToString(java.sql.Time time) { - long milliseconds = time.getTime() % 1000L; - if (milliseconds > 0) { - return String.format("%s.%03d", time, milliseconds); - } else { - return time.toString(); - } - } - - private List<Object> convertArray(List<Object> list, ColumnType childType) { - Class<?> clz = Object.class; - for (Object data : list) { - if (data != null) { - clz = data.getClass(); - break; - } - } - List<Object> convertedList = new ArrayList<>(list.size()); - ColumnValueConverter converter = getOutputConverter(childType, clz, "not_replace"); - for (Object element : list) { - if (childType.isComplexType()) { - convertedList.add(convertArray((List<Object>) element, childType)); - } else { - if (converter != null) { - convertedList.add(converter.convert(new Object[] {element})[0]); - } else { - convertedList.add(element); - } - } - } - return convertedList; - } - - private static String simplifyIPv6Address(String address) { - // Replace longest sequence of zeros with "::" - String[] parts = address.split(":"); - int longestSeqStart = -1; - int longestSeqLen = 0; - int curSeqStart = -1; - int curSeqLen = 0; - for (int i = 0; i < parts.length; i++) { - if (parts[i].equals("0")) { - if (curSeqStart == -1) { - curSeqStart = i; - } - curSeqLen++; - if (curSeqLen > longestSeqLen) { - longestSeqStart = curSeqStart; - longestSeqLen = curSeqLen; - } - } else { - curSeqStart = -1; - curSeqLen = 0; - } - } - if (longestSeqLen <= 1) { - return address; // No sequences of zeros to replace - } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < longestSeqStart; i++) { - sb.append(parts[i]).append(':'); - } - sb.append(':'); - for (int i = longestSeqStart + longestSeqLen; i < parts.length; i++) { - sb.append(parts[i]); - if (i < parts.length - 1) { - sb.append(':'); - } - } - return sb.toString(); - } - - private static final Pattern MILLIS_PATTERN = Pattern.compile("(\\.\\d+)"); - - public static DateTimeFormatter getDateTimeFormatter(String dateTimeString) { - Matcher matcher = MILLIS_PATTERN.matcher(dateTimeString); - int fractionDigits = 0; - if (matcher.find()) { - fractionDigits = matcher.group(1).length() - 1; // Subtract 1 to exclude the dot - } - fractionDigits = Math.min(fractionDigits, 6); // Limit the fraction digits to 6 - - return new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .appendFraction(ChronoField.MILLI_OF_SECOND, fractionDigits, fractionDigits, true) - .toFormatter(); - } - private static final Map<Class<?>, Function<Object, List<Object>>> CK_ARRAY_CONVERTERS = - new HashMap<>(); +public interface JdbcExecutor { + int read() throws UdfRuntimeException; - static { - CK_ARRAY_CONVERTERS.put(String[].class, res -> Arrays.asList((String[]) res)); - CK_ARRAY_CONVERTERS.put(boolean[].class, res -> toList((boolean[]) res)); - CK_ARRAY_CONVERTERS.put(Boolean[].class, res -> Arrays.asList((Boolean[]) res)); - CK_ARRAY_CONVERTERS.put(byte[].class, res -> toList((byte[]) res)); - CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.asList((Byte[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDate[].class, res -> Arrays.asList((LocalDate[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res -> Arrays.asList((LocalDateTime[]) res)); - CK_ARRAY_CONVERTERS.put(float[].class, res -> toList((float[]) res)); - CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.asList((Float[]) res)); - CK_ARRAY_CONVERTERS.put(double[].class, res -> toList((double[]) res)); - CK_ARRAY_CONVERTERS.put(Double[].class, res -> Arrays.asList((Double[]) res)); - CK_ARRAY_CONVERTERS.put(short[].class, res -> toList((short[]) res)); - CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.asList((Short[]) res)); - CK_ARRAY_CONVERTERS.put(int[].class, res -> toList((int[]) res)); - CK_ARRAY_CONVERTERS.put(Integer[].class, res -> Arrays.asList((Integer[]) res)); - CK_ARRAY_CONVERTERS.put(long[].class, res -> toList((long[]) res)); - CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.asList((Long[]) res)); - CK_ARRAY_CONVERTERS.put(BigInteger[].class, res -> Arrays.asList((BigInteger[]) res)); - CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res -> Arrays.asList((BigDecimal[]) res)); - CK_ARRAY_CONVERTERS.put( - Inet4Address[].class, - res -> - Arrays.stream((Inet4Address[]) res) - .map(addr -> addr == null ? null : addr.getHostAddress()) - .collect(Collectors.toList())); - CK_ARRAY_CONVERTERS.put( - Inet6Address[].class, - res -> - Arrays.stream((Inet6Address[]) res) - .map(addr -> addr == null ? null : simplifyIPv6Address(addr.getHostAddress())) - .collect(Collectors.toList())); - CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.asList((UUID[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedByte[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedByte[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedShort[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedShort[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedInteger[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedInteger[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedLong[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedLong[]) res)); - } + int write(Map<String, String> params) throws UdfRuntimeException; - public static List<Object> convertClickHouseArray(Object obj) { - Function<Object, List<Object>> converter = CK_ARRAY_CONVERTERS.get(obj.getClass()); - return converter != null ? converter.apply(obj) : Collections.singletonList(obj); - } + long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException; - private static <T> List<Object> toList(T array) { - if (array instanceof Object[]) { - return Arrays.asList((Object[]) array); - } - int length = Array.getLength(array); - List<Object> list = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - list.add(Array.get(array, i)); - } - return list; - } + void close() throws UdfRuntimeException, Exception; - private static final Pattern ARRAY_PATTERN = Pattern.compile("\"([^\"]*)\"|([^,]+)"); + void openTrans() throws UdfRuntimeException; - private static List<Object> parseArray(String input) { - String trimmedInput = input.substring(1, input.length() - 1); - List<Object> list = new ArrayList<>(); - Matcher matcher = ARRAY_PATTERN.matcher(trimmedInput); - while (matcher.find()) { - if (matcher.group(1) != null) { - list.add(matcher.group(1)); - } else { - list.add(matcher.group(2)); - } - } - return list; - } + void commitTrans() throws UdfRuntimeException; - private int insert(VectorTable data) throws SQLException { - for (int i = 0; i < data.getNumRows(); ++i) { - for (int j = 0; j < data.getColumns().length; ++j) { - insertColumn(i, j, data.getColumns()[j]); - } - preparedStatement.addBatch(); - } - preparedStatement.executeBatch(); - preparedStatement.clearBatch(); - return data.getNumRows(); - } + void rollbackTrans() throws UdfRuntimeException; - private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { - int parameterIndex = colIdx + 1; - ColumnType.Type dorisType = column.getColumnTyp(); - if (column.isNullAt(rowIdx)) { - insertNullColumn(parameterIndex, dorisType); - return; - } - switch (dorisType) { - case BOOLEAN: - preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); - break; - case TINYINT: - preparedStatement.setByte(parameterIndex, column.getByte(rowIdx)); - break; - case SMALLINT: - preparedStatement.setShort(parameterIndex, column.getShort(rowIdx)); - break; - case INT: - preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); - break; - case BIGINT: - preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); - break; - case LARGEINT: - preparedStatement.setObject(parameterIndex, column.getBigInteger(rowIdx)); - break; - case FLOAT: - preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); - break; - case DOUBLE: - preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); - break; - case DATEV2: - preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); - break; - case DATETIMEV2: - preparedStatement.setTimestamp( - parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } + int getCurBlockRows(); - private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) - throws SQLException { - switch (dorisType) { - case BOOLEAN: - preparedStatement.setNull(parameterIndex, Types.BOOLEAN); - break; - case TINYINT: - preparedStatement.setNull(parameterIndex, Types.TINYINT); - break; - case SMALLINT: - preparedStatement.setNull(parameterIndex, Types.SMALLINT); - break; - case INT: - preparedStatement.setNull(parameterIndex, Types.INTEGER); - break; - case BIGINT: - preparedStatement.setNull(parameterIndex, Types.BIGINT); - break; - case LARGEINT: - preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT); - break; - case FLOAT: - preparedStatement.setNull(parameterIndex, Types.FLOAT); - break; - case DOUBLE: - preparedStatement.setNull(parameterIndex, Types.DOUBLE); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setNull(parameterIndex, Types.DECIMAL); - break; - case DATEV2: - preparedStatement.setNull(parameterIndex, Types.DATE); - break; - case DATETIMEV2: - preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setNull(parameterIndex, Types.VARCHAR); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } + boolean hasNext() throws UdfRuntimeException; } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java new file mode 100644 index 00000000000..d67c58bb6b4 --- /dev/null +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jdbc; + +import org.apache.doris.thrift.TOdbcTableType; + +public class JdbcExecutorFactory { + public static String getExecutorClass(TOdbcTableType type) { + switch (type) { + case MYSQL: + return "org/apache/doris/jdbc/MySQLJdbcExecutor"; + case ORACLE: + return "org/apache/doris/jdbc/OracleJdbcExecutor"; + default: + return "org/apache/doris/jdbc/DefaultJdbcExecutor"; + } + } +} diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java new file mode 100644 index 00000000000..30b84ffbcde --- /dev/null +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jdbc; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnType.Type; +import org.apache.doris.common.jni.vec.ColumnValueConverter; +import org.apache.doris.common.jni.vec.VectorTable; +import org.apache.doris.thrift.TJdbcOperation; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.log4j.Logger; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.List; +import java.util.stream.Collectors; + +public class MySQLJdbcExecutor extends BaseJdbcExecutor { + private static final Logger LOG = Logger.getLogger(MySQLJdbcExecutor.class); + + private static final Gson gson = new Gson(); + + public MySQLJdbcExecutor(byte[] thriftParams) throws Exception { + super(thriftParams); + } + + @Override + protected boolean abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException { + if (!resultSet.isAfterLast()) { + // Abort connection before closing. Without this, the MySQL driver + // attempts to drain the connection by reading all the results. + connection.abort(MoreExecutors.directExecutor()); + return true; + } + return false; + } + + @Override + protected void initializeStatement(Connection conn, JdbcDataSourceConfig config, String sql) throws SQLException { + if (config.getOp() == TJdbcOperation.READ) { + conn.setAutoCommit(false); + Preconditions.checkArgument(sql != null, "SQL statement cannot be null for READ operation."); + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt.setFetchSize(Integer.MIN_VALUE); // MySQL: signal streaming results with Integer.MIN_VALUE + batchSizeNum = config.getBatchSize(); + } else { + LOG.info("Insert SQL: " + sql); + preparedStatement = conn.prepareStatement(sql); + } + } + + @Override + protected void initializeBlock(int columnCount, String[] replaceStringList, int batchSizeNum, + VectorTable outputTable) { + for (int i = 0; i < columnCount; ++i) { + if (replaceStringList[i].equals("bitmap") || replaceStringList[i].equals("hll")) { + block.add(new byte[batchSizeNum][]); + } else if (outputTable.getColumnType(i).getType() == Type.ARRAY) { + block.add(new String[batchSizeNum]); + } else if (outputTable.getColumnType(i).getType() == Type.STRING) { + block.add(new Object[batchSizeNum]); + } else { + block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum)); + } + } + } + + @Override + protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException { + if (replaceStringList[columnIndex].equals("bitmap") || replaceStringList[columnIndex].equals("hll")) { + byte[] data = resultSet.getBytes(columnIndex + 1); + if (resultSet.wasNull()) { + return null; + } + return data; + } else { + switch (type.getType()) { + case BOOLEAN: + return resultSet.getObject(columnIndex + 1, Boolean.class); + case TINYINT: + return resultSet.getObject(columnIndex + 1, Byte.class); + case SMALLINT: + return resultSet.getObject(columnIndex + 1, Short.class); + case INT: + return resultSet.getObject(columnIndex + 1, Integer.class); + case BIGINT: + return resultSet.getObject(columnIndex + 1, Long.class); + case LARGEINT: + return resultSet.getObject(columnIndex + 1, BigInteger.class); + case FLOAT: + return resultSet.getObject(columnIndex + 1, Float.class); + case DOUBLE: + return resultSet.getObject(columnIndex + 1, Double.class); + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return resultSet.getObject(columnIndex + 1, BigDecimal.class); + case DATE: + case DATEV2: + return resultSet.getObject(columnIndex + 1, LocalDate.class); + case DATETIME: + case DATETIMEV2: + return resultSet.getObject(columnIndex + 1, LocalDateTime.class); + case CHAR: + case VARCHAR: + case ARRAY: + return resultSet.getObject(columnIndex + 1, String.class); + case STRING: + return resultSet.getObject(columnIndex + 1); + default: + throw new IllegalArgumentException("Unsupported column type: " + type.getType()); + } + } + } + + @Override + protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) { + switch (columnType.getType()) { + case STRING: + if (replaceString.equals("bitmap") || replaceString.equals("hll")) { + return null; + } else { + return createConverter(input -> { + if (input instanceof byte[]) { + return mysqlByteArrayToHexString((byte[]) input); + } else if (input instanceof java.sql.Time) { + return timeToString((java.sql.Time) input); + } else { + return input.toString(); + } + }, String.class); + } + case ARRAY: + return createConverter( + (Object input) -> convertArray(input, columnType.getChildTypes().get(0)), + List.class); + default: + return null; + } + } + + private Object convertArray(Object input, ColumnType columnType) { + java.lang.reflect.Type listType = getListTypeForArray(columnType); + if (columnType.getType() == Type.BOOLEAN) { + List<?> list = gson.fromJson((String) input, List.class); + return list.stream().map(item -> { + if (item instanceof Boolean) { + return item; + } else if (item instanceof Number) { + return ((Number) item).intValue() != 0; + } else { + throw new IllegalArgumentException("Cannot convert " + item + " to Boolean."); + } + }).collect(Collectors.toList()); + } else if (columnType.getType() == Type.DATE || columnType.getType() == Type.DATEV2) { + List<?> list = gson.fromJson((String) input, List.class); + return list.stream().map(item -> { + if (item instanceof String) { + return LocalDate.parse((String) item); + } else { + throw new IllegalArgumentException("Cannot convert " + item + " to LocalDate."); + } + }).collect(Collectors.toList()); + } else if (columnType.getType() == Type.DATETIME || columnType.getType() == Type.DATETIMEV2) { + List<?> list = gson.fromJson((String) input, List.class); + return list.stream().map(item -> { + if (item instanceof String) { + return LocalDateTime.parse( + (String) item, + new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.MILLI_OF_SECOND, columnType.getPrecision(), + columnType.getPrecision(), true) + .toFormatter()); + } else { + throw new IllegalArgumentException("Cannot convert " + item + " to LocalDateTime."); + } + }).collect(Collectors.toList()); + } else if (columnType.getType() == Type.ARRAY) { + List<?> list = gson.fromJson((String) input, listType); + return list.stream() + .map(item -> convertArray(gson.toJson(item), columnType.getChildTypes().get(0))) + .collect(Collectors.toList()); + } else { + return gson.fromJson((String) input, listType); + } + } + + private java.lang.reflect.Type getListTypeForArray(ColumnType type) { + switch (type.getType()) { + case BOOLEAN: + return new TypeToken<List<Boolean>>() { + }.getType(); + case TINYINT: + return new TypeToken<List<Byte>>() { + }.getType(); + case SMALLINT: + return new TypeToken<List<Short>>() { + }.getType(); + case INT: + return new TypeToken<List<Integer>>() { + }.getType(); + case BIGINT: + return new TypeToken<List<Long>>() { + }.getType(); + case LARGEINT: + return new TypeToken<List<BigInteger>>() { + }.getType(); + case FLOAT: + return new TypeToken<List<Float>>() { + }.getType(); + case DOUBLE: + return new TypeToken<List<Double>>() { + }.getType(); + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return new TypeToken<List<BigDecimal>>() { + }.getType(); + case DATE: + case DATEV2: + return new TypeToken<List<LocalDate>>() { + }.getType(); + case DATETIME: + case DATETIMEV2: + return new TypeToken<List<LocalDateTime>>() { + }.getType(); + case CHAR: + case VARCHAR: + case STRING: + return new TypeToken<List<String>>() { + }.getType(); + case ARRAY: + java.lang.reflect.Type childType = getListTypeForArray(type.getChildTypes().get(0)); + TypeToken<?> token = TypeToken.getParameterized(List.class, childType); + return token.getType(); + default: + throw new IllegalArgumentException("Unsupported column type: " + type.getType()); + } + } + + private String mysqlByteArrayToHexString(byte[] bytes) { + StringBuilder hexString = new StringBuilder("0x"); + for (byte b : bytes) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex.toUpperCase()); + } + return hexString.toString(); + } + + private String timeToString(java.sql.Time time) { + long milliseconds = time.getTime() % 1000L; + if (milliseconds > 0) { + return String.format("%s.%03d", time, milliseconds); + } else { + return time.toString(); + } + } +} diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java new file mode 100644 index 00000000000..bfb941939f5 --- /dev/null +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jdbc; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnType.Type; +import org.apache.doris.common.jni.vec.ColumnValueConverter; +import org.apache.doris.common.jni.vec.VectorTable; + +import com.alibaba.druid.pool.DruidDataSource; +import org.apache.log4j.Logger; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Clob; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; + +public class OracleJdbcExecutor extends BaseJdbcExecutor { + private static final Logger LOG = Logger.getLogger(OracleJdbcExecutor.class); + + public OracleJdbcExecutor(byte[] thriftParams) throws Exception { + super(thriftParams); + } + + @Override + protected void setValidationQuery(DruidDataSource ds) { + ds.setValidationQuery("SELECT 1 FROM dual"); + } + + @Override + protected void initializeBlock(int columnCount, String[] replaceStringList, int batchSizeNum, + VectorTable outputTable) { + for (int i = 0; i < columnCount; ++i) { + if (outputTable.getColumnType(i).getType() == Type.LARGEINT) { + block.add(new BigDecimal[batchSizeNum]); + } else if (outputTable.getColumnType(i).getType() == Type.STRING) { + block.add(new Object[batchSizeNum]); + } else { + block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum)); + } + } + } + + @Override + protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException { + switch (type.getType()) { + case TINYINT: + return resultSet.getObject(columnIndex + 1, Byte.class); + case SMALLINT: + return resultSet.getObject(columnIndex + 1, Short.class); + case INT: + return resultSet.getObject(columnIndex + 1, Integer.class); + case BIGINT: + return resultSet.getObject(columnIndex + 1, Long.class); + case FLOAT: + return resultSet.getObject(columnIndex + 1, Float.class); + case DOUBLE: + return resultSet.getObject(columnIndex + 1, Double.class); + case LARGEINT: + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return resultSet.getObject(columnIndex + 1, BigDecimal.class); + case DATE: + case DATEV2: + return resultSet.getObject(columnIndex + 1, LocalDate.class); + case DATETIME: + case DATETIMEV2: + return resultSet.getObject(columnIndex + 1, LocalDateTime.class); + case CHAR: + case VARCHAR: + case STRING: + return resultSet.getObject(columnIndex + 1); + default: + throw new IllegalArgumentException("Unsupported column type: " + type.getType()); + } + } + + @Override + protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) { + switch (columnType.getType()) { + case CHAR: + return createConverter( + input -> trimSpaces(input.toString()), String.class); + case LARGEINT: + return createConverter( + input -> ((BigDecimal) input).toBigInteger(), BigInteger.class); + case STRING: + return createConverter(input -> { + if (input instanceof Clob) { + try { + return ((Clob) input).getSubString(1, (int) ((Clob) input).length()); + } catch (SQLException e) { + LOG.error("Failed to get string from clob", e); + return null; + } + } else { + return input.toString(); + } + }, String.class); + default: + return null; + } + } + + private String trimSpaces(String str) { + int end = str.length() - 1; + while (end >= 0 && str.charAt(end) == ' ') { + end--; + } + return str.substring(0, end + 1); + } +} diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 2a636532069..3520a11d8bc 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -225,11 +225,13 @@ views -- !dt_null -- \N +0000-01-01T00:00 2023-06-17T10:00 -- !test_dz -- 1 \N 2 2022-01-01 +3 0000-01-01 -- !test_filter_not -- 张三1 11 12345678 123 321312 1999-02-13T00:00 中国 男 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
