This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5d4c0305ef6 [improvement](iceberg/paimon)support estimate row count 
for 2.0 (#31927)
5d4c0305ef6 is described below

commit 5d4c0305ef69db28e9d5176ecfcdfdc2e619cdf9
Author: wuwenchi <[email protected]>
AuthorDate: Thu Mar 7 18:00:16 2024 +0800

    [improvement](iceberg/paimon)support estimate row count for 2.0 (#31927)
    
    mirror: #31204 #31473
---
 .../doris/catalog/external/HMSExternalTable.java   |  3 +-
 .../catalog/external/IcebergExternalTable.java     | 10 ++--
 .../catalog/external/PaimonExternalTable.java      | 17 +++++++
 .../doris/external/iceberg/util/IcebergUtils.java  | 33 ++++++++++++
 .../external/iceberg/IcebergMetadataCache.java     |  5 +-
 .../planner/external/iceberg/IcebergScanNode.java  |  8 ++-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  2 +-
 .../doris/statistics/util/StatisticsUtil.java      | 26 ----------
 .../paimon/test_paimon_table_stats.groovy          | 58 ++++++++++++++++++++++
 9 files changed, 120 insertions(+), 42 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 4baee416a9a..2dc1c0d6e48 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
@@ -308,7 +309,7 @@ public class HMSExternalTable extends ExternalTable {
                 rowCount = StatisticsUtil.getHiveRowCount(this);
                 break;
             case ICEBERG:
-                rowCount = StatisticsUtil.getIcebergRowCount(this);
+                rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), 
getDbName(), getName());
                 break;
             default:
                 LOG.warn("getRowCount for dlaType {} is not supported.", 
dlaType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index 7398ff19c9e..907ba1d4d71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -23,8 +23,7 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
-import org.apache.doris.statistics.ColumnStatistic;
-import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TIcebergTable;
 import org.apache.doris.thrift.TTableDescriptor;
@@ -37,7 +36,6 @@ import org.apache.iceberg.types.Types;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Optional;
 
 public class IcebergExternalTable extends ExternalTable {
 
@@ -143,10 +141,8 @@ public class IcebergExternalTable extends ExternalTable {
     }
 
     @Override
-    public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+    public long fetchRowCount() {
         makeSureInitialized();
-        return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(),
-                () -> StatisticsUtil.getIcebergColumnStats(colName,
-                        ((IcebergExternalCatalog) 
catalog).getIcebergTable(dbName, name)));
+        return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), 
getName());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
index c8ea253671d..0d53ff87b1a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
@@ -154,4 +155,20 @@ public class PaimonExternalTable extends ExternalTable {
                     + getPaimonCatalogType());
         }
     }
+
+    @Override
+    public long fetchRowCount() {
+        makeSureInitialized();
+        try {
+            long rowCount = 0;
+            List<Split> splits = 
originTable.newReadBuilder().newScan().plan().splits();
+            for (Split split : splits) {
+                rowCount += split.rowCount();
+            }
+            return rowCount;
+        } catch (Exception e) {
+            LOG.warn("Fail to collect row count for db {} table {}", dbName, 
name, e);
+        }
+        return -1;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 45deff98225..119a0b30fb6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -35,11 +35,13 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.Subquery;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.base.Preconditions;
@@ -50,6 +52,8 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
@@ -80,6 +84,9 @@ public class IcebergUtils {
         }
     };
     static long MILLIS_TO_NANO_TIME = 1000;
+    public static final String TOTAL_RECORDS = "total-records";
+    public static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
+    public static final String TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
 
     /**
      * Create Iceberg schema from Doris ColumnDef.
@@ -442,4 +449,30 @@ public class IcebergUtils {
         return builder.build();
     }
 
+
+    /**
+     * Estimate iceberg table row count.
+     * Get the row count by adding all task file recordCount.
+     *
+     * @return estimated row count
+     */
+    public static long getIcebergRowCount(ExternalCatalog catalog, String 
dbName, String tbName) {
+        try {
+            Table icebergTable = Env.getCurrentEnv()
+                    .getExtMetaCacheMgr()
+                    .getIcebergMetadataCache()
+                    .getIcebergTable(catalog, dbName, tbName);
+            Snapshot snapshot = icebergTable.currentSnapshot();
+            if (snapshot == null) {
+                // empty table
+                return 0;
+            }
+            Map<String, String> summary = snapshot.summary();
+            return Long.parseLong(summary.get(TOTAL_RECORDS)) - 
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+        } catch (Exception e) {
+            LOG.warn("Fail to collect row count for db {} table {}", dbName, 
tbName, e);
+        }
+        return -1;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 5f79623ff4a..e6b2575ee3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -72,15 +72,16 @@ public class IcebergMetadataCache {
             return ifPresent;
         }
 
-        Table icebergTable = getIcebergTable(key, catalog, 
params.getDatabase(), params.getTable());
+        Table icebergTable = getIcebergTable(catalog, params.getDatabase(), 
params.getTable());
         List<Snapshot> snaps = Lists.newArrayList();
         Iterables.addAll(snaps, icebergTable.snapshots());
         snapshotListCache.put(key, snaps);
         return snaps;
     }
 
-    public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf 
catalog, String dbName, String tbName)
+    public Table getIcebergTable(CatalogIf catalog, String dbName, String 
tbName)
             throws UserException {
+        IcebergMetadataCacheKey key = 
IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName);
         Table cacheTable = tableCache.getIfPresent(key);
         if (cacheTable != null) {
             return cacheTable;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index dbd1aab2392..e7ca3484ae0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -89,9 +89,6 @@ import java.util.stream.Collectors;
 public class IcebergScanNode extends FileQueryScanNode {
 
     public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
-    private static final String TOTAL_RECORDS = "total-records";
-    private static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
-    private static final String TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
 
     private IcebergSource source;
     private Table icebergTable;
@@ -431,8 +428,9 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         Map<String, String> summary = snapshot.summary();
-        if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
-            return Long.parseLong(summary.get(TOTAL_RECORDS)) - 
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+        if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
+            return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
+                - 
Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
         } else {
             return -1;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index af2ee141626..ac8e4f5cd98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2469,7 +2469,7 @@ public class ShowExecutor {
            or estimate with file size and schema if it's not analyzed.
            tableStats == null means it's not analyzed, in this case show the 
estimated row count.
          */
-        if (tableStats == null && tableIf instanceof HMSExternalTable) {
+        if (tableStats == null) {
             resultSet = 
showTableStatsStmt.constructResultSet(tableIf.getRowCount());
         } else {
             resultSet = showTableStatsStmt.constructResultSet(tableStats);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 7752cfcc0e8..55be0fed915 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.types.Types;
 import org.apache.logging.log4j.LogManager;
@@ -592,31 +591,6 @@ public class StatisticsUtil {
         return parameters.containsKey(TOTAL_SIZE) ? 
Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
     }
 
-    /**
-     * Estimate iceberg table row count.
-     * Get the row count by adding all task file recordCount.
-     *
-     * @param table Iceberg HMSExternalTable to estimate row count.
-     * @return estimated row count
-     */
-    public static long getIcebergRowCount(HMSExternalTable table) {
-        long rowCount = 0;
-        try {
-            Table icebergTable = Env.getCurrentEnv()
-                    .getExtMetaCacheMgr()
-                    .getIcebergMetadataCache()
-                    .getIcebergTable(table);
-            TableScan tableScan = icebergTable.newScan().includeColumnStats();
-            for (FileScanTask task : tableScan.planFiles()) {
-                rowCount += task.file().recordCount();
-            }
-            return rowCount;
-        } catch (Exception e) {
-            LOG.warn("Fail to collect row count for db {} table {}", 
table.getDbName(), table.getName(), e);
-        }
-        return -1;
-    }
-
     /**
      * Estimate hive table row count : totalFileSize/estimatedRowSize
      *
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
new file mode 100644
index 00000000000..88494d1b1bf
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_table_stats", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+            String catalog_name = "paimon1"
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+
+            sql """drop catalog if exists ${catalog_name}"""
+            sql """create catalog if not exists ${catalog_name} properties (
+                "type" = "paimon",
+                "paimon.catalog.type"="filesystem",
+                "warehouse" = 
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
+            );"""
+
+            def assert_stats = { table_name, cnt -> 
+                def retry = 0
+                def act = ""
+                while (retry < 10) {
+                    def result = sql """ show table stats ${table_name} """
+                    act = result[0][2]
+                    if (act != "0") {
+                        break;
+                    }
+                    Thread.sleep(2000)
+                    retry++
+                }
+                assertEquals(act, cnt)
+            }
+
+            // select
+            sql """ switch ${catalog_name} """
+            sql """ use db1 """
+            assert_stats("all_table", "2")
+            assert_stats("auto_bucket", "2")
+            assert_stats("complex_all", "3")
+            assert_stats("complex_tab", "3")
+        } finally {
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to