This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit f7e6f7b8638bc7dce29adcd0181119dba662e6bd Author: Tiewei Fang <[email protected]> AuthorDate: Sun Mar 26 20:02:03 2023 +0800 [BugFix](jdbc catalog) fix OOM when jdbc catalog querys large data from doris #18067 When using JDBC Catalog to query the Doris data, because Doris does not provide the cursor reading method (that is, fetchBatchSize is invalid), Doris will send the data to the client at one time, resulting in client OOM. The MySQL protocol provides a stream reading method. Doris can use this method to avoid OOM. The requirements of using the stream method are setting fetchbatchsize = Integer.MIN_VALUE and setting ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY --- be/src/vec/exec/vjdbc_connector.cpp | 1 + .../org/apache/doris/catalog/JdbcResource.java | 2 +- .../java/org/apache/doris/udf/JdbcExecutor.java | 11 +++++++--- gensrc/thrift/Types.thrift | 24 ++++++++++++---------- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index e9caf93647..8887ad2223 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -151,6 +151,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_driver_path(local_location); ctor_params.__set_batch_size(read ? state->batch_size() : 0); ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); + ctor_params.__set_table_type(_conn_param.table_type); jbyteArray ctor_params_bytes; // Pushed frame will be popped when jni_frame goes out-of-scope. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 7cac35d627..35e655526c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -283,7 +283,7 @@ public class JdbcResource extends Resource { newJdbcUrl = checkAndSetJdbcBoolParam(newJdbcUrl, "useUnicode", "false", "true"); newJdbcUrl = checkAndSetJdbcParam(newJdbcUrl, "characterEncoding", "utf-8"); } - if (dbType.equals(MYSQL) || dbType.equals(POSTGRESQL)) { + if (dbType.equals(POSTGRESQL)) { newJdbcUrl = checkAndSetJdbcBoolParam(newJdbcUrl, "useCursorFetch", "false", "true"); } return newJdbcUrl; diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index df30b8e1b5..7918cd1142 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -19,6 +19,7 @@ package org.apache.doris.udf; import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; +import org.apache.doris.thrift.TOdbcTableType; import com.alibaba.druid.pool.DruidDataSource; import com.google.common.base.Preconditions; @@ -71,7 +72,7 @@ public class JdbcExecutor { throw new InternalException(e.getMessage()); } init(request.driver_path, request.statement, request.batch_size, request.jdbc_driver_class, - request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op); + request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op, request.table_type); } public void close() throws Exception { @@ -203,7 +204,7 @@ public class JdbcExecutor { } private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, - String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException { + String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) throws UdfRuntimeException { try { ClassLoader parent = getClass().getClassLoader(); ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); @@ -226,7 +227,11 @@ public class JdbcExecutor { conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - stmt.setFetchSize(batchSize); + if (tableType == TOdbcTableType.MYSQL) { + stmt.setFetchSize(Integer.MIN_VALUE); + } else { + stmt.setFetchSize(batchSize); + } batchSizeNum = batchSize; } else { stmt = conn.createStatement(); diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 622822ec95..f55f747126 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -364,6 +364,17 @@ enum TJdbcOperation { WRITE } +enum TOdbcTableType { + MYSQL, + ORACLE, + POSTGRESQL, + SQLSERVER, + REDIS, + MONGODB, + CLICKHOUSE, + SAP_HANA +} + struct TJdbcExecutorCtorParams { 1: optional string statement @@ -385,6 +396,8 @@ struct TJdbcExecutorCtorParams { // "/home/user/mysql-connector-java-5.1.47.jar" 8: optional string driver_path + + 9: optional TOdbcTableType table_type } struct TJavaUdfExecutorCtorParams { @@ -552,17 +565,6 @@ enum TTableType { JDBC_TABLE } -enum TOdbcTableType { - MYSQL, - ORACLE, - POSTGRESQL, - SQLSERVER, - REDIS, - MONGODB, - CLICKHOUSE, - SAP_HANA -} - enum TKeysType { PRIMARY_KEYS, DUP_KEYS, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
