This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e44e50e592 [feature](statistics) Support get row count for JDBC
external table. (#38889)
8e44e50e592 is described below
commit 8e44e50e592476cf78487ccda7ed20fd14272cef
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Aug 30 19:24:09 2024 +0800
[feature](statistics) Support get row count for JDBC external table.
(#38889)
Support get row count for JDBC external table by executing select
count(1) from table.
Return -1 when external table row count is unknown.
---
.../doris/datasource/jdbc/JdbcExternalTable.java | 58 ++++++++++++++++++++++
.../jdbc/test_mysql_jdbc_statistics.groovy | 47 +++++++++++-------
2 files changed, 88 insertions(+), 17 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 21d73499f2e..495311bc087 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -17,19 +17,28 @@
package org.apache.doris.datasource.jdbc;
+import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -38,6 +47,9 @@ import java.util.Optional;
public class JdbcExternalTable extends ExternalTable {
private static final Logger LOG =
LogManager.getLogger(JdbcExternalTable.class);
+ public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
+ + "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from
`${dbName}` like '${tblName}'\");";
+
private JdbcTable jdbcTable;
/**
@@ -98,4 +110,50 @@ public class JdbcExternalTable extends ExternalTable {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
+
+ @Override
+ public long fetchRowCount() {
+ Map<String, String> params = new HashMap<>();
+ params.put("ctlName", catalog.getName());
+ params.put("dbName", dbName);
+ params.put("tblName", name);
+ switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
+ case JdbcResource.MYSQL:
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext(false, false)) {
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
+ String sql =
stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
+ StmtExecutor stmtExecutor = new
StmtExecutor(r.connectContext, sql);
+ List<ResultRow> resultRows =
stmtExecutor.executeInternalQuery();
+ if (resultRows == null || resultRows.size() != 1) {
+ LOG.info("No mysql status found for table {}.{}.{}",
catalog.getName(), dbName, name);
+ return -1;
+ }
+ StatementBase parsedStmt = stmtExecutor.getParsedStmt();
+ if (parsedStmt == null || parsedStmt.getColLabels() ==
null) {
+ LOG.info("No column label found for table {}.{}.{}",
catalog.getName(), dbName, name);
+ return -1;
+ }
+ ResultRow resultRow = resultRows.get(0);
+ List<String> colLabels = parsedStmt.getColLabels();
+ int index = colLabels.indexOf("TABLE_ROWS");
+ if (index == -1) {
+ LOG.info("No TABLE_ROWS in status for table {}.{}.{}",
catalog.getName(), dbName, name);
+ return -1;
+ }
+ long rows = Long.parseLong(resultRow.get(index));
+ LOG.info("Get mysql table {}.{}.{} row count {}",
catalog.getName(), dbName, name, rows);
+ return rows;
+ } catch (Exception e) {
+ LOG.warn("Failed to fetch mysql row count for table
{}.{}.{}. Reason [{}]",
+ catalog.getName(), dbName, name, e.getMessage());
+ return -1;
+ }
+ case JdbcResource.ORACLE:
+ case JdbcResource.POSTGRESQL:
+ case JdbcResource.SQLSERVER:
+ default:
+ break;
+ }
+ return -1;
+ }
}
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
index 66b04ebd513..e9bd59d8cb2 100644
---
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
+++
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
@@ -17,6 +17,7 @@
suite("test_mysql_jdbc_statistics",
"p0,external,mysql,external_docker,external_docker_mysql") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ logger.info("enabled " + enabled)
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
String s3_endpoint = getS3Endpoint()
@@ -35,28 +36,40 @@ suite("test_mysql_jdbc_statistics",
"p0,external,mysql,external_docker,external_
);"""
sql """use ${catalog_name}.doris_test"""
+
+ def result = sql """show table stats ex_tb0"""
+ Thread.sleep(1000)
+ for (int i = 0; i < 20; i++) {
+ result = sql """show table stats ex_tb0""";
+ if (result[0][2] != "-1") {
+ assertEquals("5", result[0][2])
+ break;
+ }
+ logger.info("Table row count not ready yet. Wait 1 second.")
+ Thread.sleep(1000)
+ }
sql """analyze table ex_tb0 with sync"""
- def result = sql """show column stats ex_tb0 (name)"""
- assertTrue(result.size() == 1)
- assertTrue(result[0][0] == "name")
- assertTrue(result[0][2] == "5.0")
- assertTrue(result[0][3] == "5.0")
- assertTrue(result[0][4] == "0.0")
- assertTrue(result[0][5] == "15.0")
- assertTrue(result[0][6] == "3.0")
+ result = sql """show column stats ex_tb0 (name)"""
+ assertEquals(result.size(), 1)
+ assertEquals(result[0][0], "name")
+ assertEquals(result[0][2], "5.0")
+ assertEquals(result[0][3], "5.0")
+ assertEquals(result[0][4], "0.0")
+ assertEquals(result[0][5], "15.0")
+ assertEquals(result[0][6], "3.0")
assertEquals(result[0][7], "'abc'")
assertEquals(result[0][8], "'abg'")
result = sql """show column stats ex_tb0 (id)"""
- assertTrue(result.size() == 1)
- assertTrue(result[0][0] == "id")
- assertTrue(result[0][2] == "5.0")
- assertTrue(result[0][3] == "5.0")
- assertTrue(result[0][4] == "0.0")
- assertTrue(result[0][5] == "20.0")
- assertTrue(result[0][6] == "4.0")
- assertTrue(result[0][7] == "111")
- assertTrue(result[0][8] == "115")
+ assertEquals(result.size(), 1)
+ assertEquals(result[0][0], "id")
+ assertEquals(result[0][2], "5.0")
+ assertEquals(result[0][3], "5.0")
+ assertEquals(result[0][4], "0.0")
+ assertEquals(result[0][5], "20.0")
+ assertEquals(result[0][6], "4.0")
+ assertEquals(result[0][7], "111")
+ assertEquals(result[0][8], "115")
sql """drop catalog ${catalog_name}"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]