This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e44e50e592 [feature](statistics) Support get row count for JDBC 
external table. (#38889)
8e44e50e592 is described below

commit 8e44e50e592476cf78487ccda7ed20fd14272cef
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Aug 30 19:24:09 2024 +0800

    [feature](statistics) Support get row count for JDBC external table. 
(#38889)
    
    Support get row count for JDBC external table by executing select
    count(1) from table.
    Return -1 when external table row count is unknown.
---
 .../doris/datasource/jdbc/JdbcExternalTable.java   | 58 ++++++++++++++++++++++
 .../jdbc/test_mysql_jdbc_statistics.groovy         | 47 +++++++++++-------
 2 files changed, 88 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 21d73499f2e..495311bc087 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -17,19 +17,28 @@
 
 package org.apache.doris.datasource.jdbc;
 
+import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.catalog.JdbcTable;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ExternalAnalysisTask;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.thrift.TTableDescriptor;
 
+import org.apache.commons.text.StringSubstitutor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -38,6 +47,9 @@ import java.util.Optional;
 public class JdbcExternalTable extends ExternalTable {
     private static final Logger LOG = 
LogManager.getLogger(JdbcExternalTable.class);
 
+    public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
+            + "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from 
`${dbName}` like '${tblName}'\");";
+
     private JdbcTable jdbcTable;
 
     /**
@@ -98,4 +110,50 @@ public class JdbcExternalTable extends ExternalTable {
         makeSureInitialized();
         return new ExternalAnalysisTask(info);
     }
+
+    @Override
+    public long fetchRowCount() {
+        Map<String, String> params = new HashMap<>();
+        params.put("ctlName", catalog.getName());
+        params.put("dbName", dbName);
+        params.put("tblName", name);
+        switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
+            case JdbcResource.MYSQL:
+                try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext(false, false)) {
+                    StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+                    String sql = 
stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
+                    StmtExecutor stmtExecutor = new 
StmtExecutor(r.connectContext, sql);
+                    List<ResultRow> resultRows = 
stmtExecutor.executeInternalQuery();
+                    if (resultRows == null || resultRows.size() != 1) {
+                        LOG.info("No mysql status found for table {}.{}.{}", 
catalog.getName(), dbName, name);
+                        return -1;
+                    }
+                    StatementBase parsedStmt = stmtExecutor.getParsedStmt();
+                    if (parsedStmt == null || parsedStmt.getColLabels() == 
null) {
+                        LOG.info("No column label found for table {}.{}.{}", 
catalog.getName(), dbName, name);
+                        return -1;
+                    }
+                    ResultRow resultRow = resultRows.get(0);
+                    List<String> colLabels = parsedStmt.getColLabels();
+                    int index = colLabels.indexOf("TABLE_ROWS");
+                    if (index == -1) {
+                        LOG.info("No TABLE_ROWS in status for table {}.{}.{}", 
catalog.getName(), dbName, name);
+                        return -1;
+                    }
+                    long rows = Long.parseLong(resultRow.get(index));
+                    LOG.info("Get mysql table {}.{}.{} row count {}", 
catalog.getName(), dbName, name, rows);
+                    return rows;
+                } catch (Exception e) {
+                    LOG.warn("Failed to fetch mysql row count for table 
{}.{}.{}. Reason [{}]",
+                            catalog.getName(), dbName, name, e.getMessage());
+                    return -1;
+                }
+            case JdbcResource.ORACLE:
+            case JdbcResource.POSTGRESQL:
+            case JdbcResource.SQLSERVER:
+            default:
+                break;
+        }
+        return -1;
+    }
 }
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
index 66b04ebd513..e9bd59d8cb2 100644
--- 
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy
@@ -17,6 +17,7 @@
 
 suite("test_mysql_jdbc_statistics", 
"p0,external,mysql,external_docker,external_docker_mysql") {
     String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    logger.info("enabled " + enabled)
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
     String mysql_port = context.config.otherConfigs.get("mysql_57_port");
     String s3_endpoint = getS3Endpoint()
@@ -35,28 +36,40 @@ suite("test_mysql_jdbc_statistics", 
"p0,external,mysql,external_docker,external_
         );"""
 
         sql """use ${catalog_name}.doris_test"""
+
+        def result = sql """show table stats ex_tb0"""
+        Thread.sleep(1000)
+        for (int i = 0; i < 20; i++) {
+            result = sql """show table stats ex_tb0""";
+            if (result[0][2] != "-1") {
+                assertEquals("5", result[0][2])
+                break;
+            }
+            logger.info("Table row count not ready yet. Wait 1 second.")
+            Thread.sleep(1000)
+        }
         sql """analyze table ex_tb0 with sync"""
-        def result = sql """show column stats ex_tb0 (name)"""
-        assertTrue(result.size() == 1)
-        assertTrue(result[0][0] == "name")
-        assertTrue(result[0][2] == "5.0")
-        assertTrue(result[0][3] == "5.0")
-        assertTrue(result[0][4] == "0.0")
-        assertTrue(result[0][5] == "15.0")
-        assertTrue(result[0][6] == "3.0")
+        result = sql """show column stats ex_tb0 (name)"""
+        assertEquals(result.size(), 1)
+        assertEquals(result[0][0], "name")
+        assertEquals(result[0][2], "5.0")
+        assertEquals(result[0][3], "5.0")
+        assertEquals(result[0][4], "0.0")
+        assertEquals(result[0][5], "15.0")
+        assertEquals(result[0][6], "3.0")
         assertEquals(result[0][7], "'abc'")
         assertEquals(result[0][8], "'abg'")
 
         result = sql """show column stats ex_tb0 (id)"""
-        assertTrue(result.size() == 1)
-        assertTrue(result[0][0] == "id")
-        assertTrue(result[0][2] == "5.0")
-        assertTrue(result[0][3] == "5.0")
-        assertTrue(result[0][4] == "0.0")
-        assertTrue(result[0][5] == "20.0")
-        assertTrue(result[0][6] == "4.0")
-        assertTrue(result[0][7] == "111")
-        assertTrue(result[0][8] == "115")
+        assertEquals(result.size(), 1)
+        assertEquals(result[0][0], "id")
+        assertEquals(result[0][2], "5.0")
+        assertEquals(result[0][3], "5.0")
+        assertEquals(result[0][4], "0.0")
+        assertEquals(result[0][5], "20.0")
+        assertEquals(result[0][6], "4.0")
+        assertEquals(result[0][7], "111")
+        assertEquals(result[0][8], "115")
 
         sql """drop catalog ${catalog_name}"""
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to