This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 253445ca46 [vectorzied](jdbc) fix jdbc executor for get result by
batch and memo… (#15843)
253445ca46 is described below
commit 253445ca466cc105bf6ebfce5524cfeae894767c
Author: zhangstar333 <[email protected]>
AuthorDate: Sat Jan 21 08:22:22 2023 +0800
[vectorzied](jdbc) fix jdbc executor for get result by batch and memo…
(#15843)
result set should be get by batch size2.
fix memory leak3.
---
be/src/vec/exec/vjdbc_connector.cpp | 6 +-
be/src/vec/exec/vjdbc_connector.h | 1 +
docs/zh-CN/docs/lakehouse/external-table/jdbc.md | 6 +-
docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md | 9 ++
fe/java-udf/pom.xml | 5 --
.../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++
.../java/org/apache/doris/udf/JdbcExecutor.java | 96 ++++++++++++----------
7 files changed, 141 insertions(+), 52 deletions(-)
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index adc1ac4cd2..d9e6838925 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos,
std::vector<MutableColumnPtr>& columns
const std::string& column_name = slot_desc->col_name();
jobject column_data =
env->CallObjectMethod(block_obj, _executor_get_list_id,
materialized_column_index);
- jint num_rows = env->CallIntMethod(column_data,
_executor_get_list_size_id);
-
+ jint num_rows = env->CallNonvirtualIntMethod(_executor_obj,
_executor_clazz,
+ _executor_block_rows_id);
for (int row = 0; row < num_rows; ++row) {
jobject cur_data = env->CallObjectMethod(column_data,
_executor_get_list_id, row);
RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
@@ -412,6 +412,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
_executor_close_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext",
JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
_executor_has_next_id));
+ RETURN_IF_ERROR(
+ register_id(_executor_clazz, "getCurBlockRows", "()I",
_executor_block_rows_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock",
JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
_executor_get_blocks_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index c1d416783c..4f05253d64 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -114,6 +114,7 @@ private:
jmethodID _executor_write_id;
jmethodID _executor_read_id;
jmethodID _executor_has_next_id;
+ jmethodID _executor_block_rows_id;
jmethodID _executor_get_blocks_id;
jmethodID _executor_get_types_id;
jmethodID _executor_get_arr_list_id;
diff --git a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
index f87cfe3bf2..4c53d27243 100644
--- a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
+++ b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
@@ -55,7 +55,7 @@ properties (
"type"="jdbc",
"user"="root",
"password"="123456",
- "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test",
+ "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test?useCursorFetch=true",
"driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar",
"driver_class"="com.mysql.jdbc.Driver"
);
@@ -81,7 +81,7 @@ PROPERTIES (
| **type** | "jdbc", 必填项标志资源类型 |
| **user** | 访问外表数据库所使的用户名 |
| **password** | 该用户对应的密码信息 |
-| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql:
"jdbc:mysql://127.0.0.1:3306/test"。|
+| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql:
"jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true"。|
| **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. |
| **driver_url** |
用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar。本地单机测试时,可将jar包放在本地路径下,"driver_url"="file:///home/disk1/pathTo/mysql-connector-java-5.1.47.jar",多机时需保证具有完全相同的路径信息。
|
| **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。|
@@ -266,4 +266,4 @@ PROPERTIES (
## Q&A
-请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。
\ No newline at end of file
+请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
index 136dfd41db..7d16eeace3 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
@@ -332,3 +332,12 @@ JDBC Catalog 通过标准 JDBC 协议,连接其他数据源。
```
可在创建 Catalog 的 `jdbc_url` 把JDBC连接串最后增加 `?useSSL=false` ,如 `"jdbc_url" =
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false"`
+
+6. 查询MYSQL的数据库报OutOfMemoryError的错误
+
+ 为减少内存的使用,在获取结果集时,每次仅获取batchSize的大小,这样一批一批的获取结果。而MYSQL默认是一次将结果全部加载到内存,
+
设置的按批获取无法生效,需要主动显示的在URL中指定:"jdbc_url"="jdbc:mysql://IP:PORT/doris_test?useCursorFetch=true"
+
+ 7. 在使用JDBC查询过程中时,如果出现"CAUSED BY: SQLException OutOfMemoryError" 类似的错误
+
+
如果MYSQL已经主动设置useCursorFetch,可以在be.conf中修改jvm_max_heap_size的值,尝试增大JVM的内存,目前默认值为1024M。
\ No newline at end of file
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 493015d49c..b37e9c4696 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -104,11 +104,6 @@ under the License.
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <version>${hikaricp.version}</version>
- </dependency>
</dependencies>
<build>
<finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
new file mode 100644
index 0000000000..94fbde6217
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+
+public class FakeDriver implements Driver {
+ private Driver driver;
+
+ FakeDriver(Driver driver) {
+ this.driver = driver;
+ }
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException
{
+ return this.driver.connect(url, info);
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws SQLException {
+ return this.driver.acceptsURL(url);
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(String url, Properties info)
throws SQLException {
+ return this.driver.getPropertyInfo(url, info);
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return this.driver.getMajorVersion();
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return this.driver.getMinorVersion();
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return this.driver.jdbcCompliant();
+ }
+
+ @Override
+ public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index d90aa4055a..e76e9e78e3 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
import org.apache.doris.thrift.TJdbcOperation;
import com.google.common.base.Preconditions;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import java.io.FileNotFoundException;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Date;
+import java.sql.Driver;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -50,10 +53,12 @@ public class JdbcExecutor {
private Statement stmt = null;
private ResultSet resultSet = null;
private ResultSetMetaData resultSetMetaData = null;
- // Use HikariDataSource to help us manage the JDBC connections.
- private HikariDataSource dataSource = null;
private List<String> resultColumnTypeNames = null;
private int baseTypeInt = 0;
+ private URLClassLoader classLoader = null;
+ private List<List<Object>> block = null;
+ private int bacthSizeNum = 0;
+ private int curBlockRows = 0;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -75,22 +80,31 @@ public class JdbcExecutor {
stmt.close();
}
if (conn != null) {
+ conn.clearWarnings();
conn.close();
}
- if (dataSource != null) {
- dataSource.close();
+ if (classLoader != null) {
+ classLoader.clearAssertionStatus();
+ classLoader.close();
}
resultSet = null;
stmt = null;
conn = null;
- dataSource = null;
+ classLoader = null;
}
public int read() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
resultSetMetaData = resultSet.getMetaData();
- return resultSetMetaData.getColumnCount();
+ int columnCount = resultSetMetaData.getColumnCount();
+ resultColumnTypeNames = new ArrayList<>(columnCount);
+ block = new ArrayList<>(columnCount);
+ for (int i = 0; i < columnCount; ++i) {
+
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+ block.add(Arrays.asList(new Object[bacthSizeNum]));
+ }
+ return columnCount;
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
}
@@ -111,17 +125,8 @@ public class JdbcExecutor {
}
}
- public List<String> getResultColumnTypeNames() throws UdfRuntimeException {
- try {
- int count = resultSetMetaData.getColumnCount();
- resultColumnTypeNames = new ArrayList<>(count);
- for (int i = 0; i < count; ++i) {
-
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
- }
- return resultColumnTypeNames;
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor
getResultColumnTypeNames has error: ", e);
- }
+ public List<String> getResultColumnTypeNames() {
+ return resultColumnTypeNames;
}
public List<Object> getArrayColumnData(Object object) throws
UdfRuntimeException {
@@ -169,20 +174,15 @@ public class JdbcExecutor {
}
public List<List<Object>> getBlock(int batchSize) throws
UdfRuntimeException {
- List<List<Object>> block = null;
try {
int columnCount = resultSetMetaData.getColumnCount();
- block = new ArrayList<>(columnCount);
- for (int i = 0; i < columnCount; ++i) {
- block.add(new ArrayList<>(batchSize));
- }
- int numRows = 0;
+ curBlockRows = 0;
do {
for (int i = 0; i < columnCount; ++i) {
- block.get(i).add(resultSet.getObject(i + 1));
+ block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
}
- numRows++;
- } while (numRows < batchSize && resultSet.next());
+ curBlockRows++;
+ } while (curBlockRows < batchSize && resultSet.next());
} catch (SQLException e) {
throw new UdfRuntimeException("get next block failed: ", e);
} catch (Exception e) {
@@ -191,6 +191,10 @@ public class JdbcExecutor {
return block;
}
+ public int getCurBlockRows() {
+ return curBlockRows;
+ }
+
public boolean hasNext() throws UdfRuntimeException {
try {
if (resultSet == null) {
@@ -242,33 +246,41 @@ public class JdbcExecutor {
private void init(String driverUrl, String sql, int batchSize, String
driverClass, String jdbcUrl, String jdbcUser,
String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException
{
try {
- ClassLoader parent = getClass().getClassLoader();
- ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl,
parent);
- Thread.currentThread().setContextClassLoader(classLoader);
- HikariConfig config = new HikariConfig();
- config.setDriverClassName(driverClass);
- config.setJdbcUrl(jdbcUrl);
- config.setUsername(jdbcUser);
- config.setPassword(jdbcPassword);
- config.setMaximumPoolSize(1);
+ File file = new File(driverUrl);
+ URL url = file.toURI().toURL();
+ classLoader = new URLClassLoader(new URL[] {url});
+ Driver driver = (Driver) Class.forName(driverClass, true,
classLoader).getDeclaredConstructor()
+ .newInstance();
+ // in jdk11 cann't call addURL function by reflect to load class.
so use this way
+ // But DriverManager can't find the driverClass correctly, so add
a faker driver
+ // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
+ DriverManager.registerDriver(new FakeDriver(driver));
+ conn = DriverManager.getConnection(jdbcUrl, jdbcUser,
jdbcPassword);
- dataSource = new HikariDataSource(config);
- conn = dataSource.getConnection();
if (op == TJdbcOperation.READ) {
conn.setAutoCommit(false);
Preconditions.checkArgument(sql != null);
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.FETCH_FORWARD);
stmt.setFetchSize(batchSize);
+ bacthSizeNum = batchSize;
} else {
stmt = conn.createStatement();
}
- } catch (FileNotFoundException e) {
- throw new UdfRuntimeException("Can not find driver file: " +
driverUrl, e);
+ } catch (ClassNotFoundException e) {
+ throw new UdfRuntimeException("ClassNotFoundException: " +
driverClass, e);
} catch (MalformedURLException e) {
throw new UdfRuntimeException("MalformedURLException to load class
about " + driverUrl, e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
+ } catch (InstantiationException e) {
+ throw new UdfRuntimeException("InstantiationException failed: ",
e);
+ } catch (IllegalAccessException e) {
+ throw new UdfRuntimeException("IllegalAccessException failed: ",
e);
+ } catch (InvocationTargetException e) {
+ throw new UdfRuntimeException("InvocationTargetException new
instance failed: ", e);
+ } catch (NoSuchMethodException e) {
+ throw new UdfRuntimeException("NoSuchMethodException Load class
failed: ", e);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]