This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f7e6f7b8638bc7dce29adcd0181119dba662e6bd
Author: Tiewei Fang <[email protected]>
AuthorDate: Sun Mar 26 20:02:03 2023 +0800

    [BugFix](jdbc catalog) fix OOM when jdbc catalog querys large data from 
doris #18067
    
    When using JDBC Catalog to query the Doris data, because Doris does not 
provide the cursor reading method (that is, fetchBatchSize is invalid), Doris 
will send the data to the client at one time, resulting in client OOM.
    
    The MySQL protocol provides a stream reading method. Doris can use this 
method to avoid OOM. The requirements of using the stream method are setting 
fetchbatchsize =  Integer.MIN_VALUE and setting ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY
---
 be/src/vec/exec/vjdbc_connector.cpp                |  1 +
 .../org/apache/doris/catalog/JdbcResource.java     |  2 +-
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 11 +++++++---
 gensrc/thrift/Types.thrift                         | 24 ++++++++++++----------
 4 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index e9caf93647..8887ad2223 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -151,6 +151,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
         ctor_params.__set_driver_path(local_location);
         ctor_params.__set_batch_size(read ? state->batch_size() : 0);
         ctor_params.__set_op(read ? TJdbcOperation::READ : 
TJdbcOperation::WRITE);
+        ctor_params.__set_table_type(_conn_param.table_type);
 
         jbyteArray ctor_params_bytes;
         // Pushed frame will be popped when jni_frame goes out-of-scope.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index 7cac35d627..35e655526c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -283,7 +283,7 @@ public class JdbcResource extends Resource {
             newJdbcUrl = checkAndSetJdbcBoolParam(newJdbcUrl, "useUnicode", 
"false", "true");
             newJdbcUrl = checkAndSetJdbcParam(newJdbcUrl, "characterEncoding", 
"utf-8");
         }
-        if (dbType.equals(MYSQL) || dbType.equals(POSTGRESQL)) {
+        if (dbType.equals(POSTGRESQL)) {
             newJdbcUrl = checkAndSetJdbcBoolParam(newJdbcUrl, 
"useCursorFetch", "false", "true");
         }
         return newJdbcUrl;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index df30b8e1b5..7918cd1142 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.udf;
 
 import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
+import org.apache.doris.thrift.TOdbcTableType;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.google.common.base.Preconditions;
@@ -71,7 +72,7 @@ public class JdbcExecutor {
             throw new InternalException(e.getMessage());
         }
         init(request.driver_path, request.statement, request.batch_size, 
request.jdbc_driver_class,
-                request.jdbc_url, request.jdbc_user, request.jdbc_password, 
request.op);
+                request.jdbc_url, request.jdbc_user, request.jdbc_password, 
request.op, request.table_type);
     }
 
     public void close() throws Exception {
@@ -203,7 +204,7 @@ public class JdbcExecutor {
     }
 
     private void init(String driverUrl, String sql, int batchSize, String 
driverClass, String jdbcUrl, String jdbcUser,
-            String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException 
{
+            String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) 
throws UdfRuntimeException {
         try {
             ClassLoader parent = getClass().getClassLoader();
             ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, 
parent);
@@ -226,7 +227,11 @@ public class JdbcExecutor {
                 conn.setAutoCommit(false);
                 Preconditions.checkArgument(sql != null);
                 stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
-                stmt.setFetchSize(batchSize);
+                if (tableType == TOdbcTableType.MYSQL) {
+                    stmt.setFetchSize(Integer.MIN_VALUE);
+                } else {
+                    stmt.setFetchSize(batchSize);
+                }
                 batchSizeNum = batchSize;
             } else {
                 stmt = conn.createStatement();
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 622822ec95..f55f747126 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -364,6 +364,17 @@ enum TJdbcOperation {
     WRITE
 }
 
+enum TOdbcTableType {
+    MYSQL,
+    ORACLE,
+    POSTGRESQL,
+    SQLSERVER,
+    REDIS,
+    MONGODB,
+    CLICKHOUSE,
+    SAP_HANA
+}
+
 struct TJdbcExecutorCtorParams {
   1: optional string statement
 
@@ -385,6 +396,8 @@ struct TJdbcExecutorCtorParams {
 
   // "/home/user/mysql-connector-java-5.1.47.jar"
   8: optional string driver_path
+
+  9: optional TOdbcTableType table_type
 }
 
 struct TJavaUdfExecutorCtorParams {
@@ -552,17 +565,6 @@ enum TTableType {
     JDBC_TABLE
 }
 
-enum TOdbcTableType {
-    MYSQL,
-    ORACLE,
-    POSTGRESQL,
-    SQLSERVER,
-    REDIS,
-    MONGODB,
-    CLICKHOUSE,
-    SAP_HANA
-}
-
 enum TKeysType {
     PRIMARY_KEYS,
     DUP_KEYS,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to