This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5d4c0305ef6 [improvement](iceberg/paimon)support estimate row count
for 2.0 (#31927)
5d4c0305ef6 is described below
commit 5d4c0305ef69db28e9d5176ecfcdfdc2e619cdf9
Author: wuwenchi <[email protected]>
AuthorDate: Thu Mar 7 18:00:16 2024 +0800
[improvement](iceberg/paimon)support estimate row count for 2.0 (#31927)
mirror: #31204 #31473
---
.../doris/catalog/external/HMSExternalTable.java | 3 +-
.../catalog/external/IcebergExternalTable.java | 10 ++--
.../catalog/external/PaimonExternalTable.java | 17 +++++++
.../doris/external/iceberg/util/IcebergUtils.java | 33 ++++++++++++
.../external/iceberg/IcebergMetadataCache.java | 5 +-
.../planner/external/iceberg/IcebergScanNode.java | 8 ++-
.../java/org/apache/doris/qe/ShowExecutor.java | 2 +-
.../doris/statistics/util/StatisticsUtil.java | 26 ----------
.../paimon/test_paimon_table_stats.groovy | 58 ++++++++++++++++++++++
9 files changed, 120 insertions(+), 42 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 4baee416a9a..2dc1c0d6e48 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@@ -308,7 +309,7 @@ public class HMSExternalTable extends ExternalTable {
rowCount = StatisticsUtil.getHiveRowCount(this);
break;
case ICEBERG:
- rowCount = StatisticsUtil.getIcebergRowCount(this);
+ rowCount = IcebergUtils.getIcebergRowCount(getCatalog(),
getDbName(), getName());
break;
default:
LOG.warn("getRowCount for dlaType {} is not supported.",
dlaType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index 7398ff19c9e..907ba1d4d71 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -23,8 +23,7 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
-import org.apache.doris.statistics.ColumnStatistic;
-import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
@@ -37,7 +36,6 @@ import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
-import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@@ -143,10 +141,8 @@ public class IcebergExternalTable extends ExternalTable {
}
@Override
- public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+ public long fetchRowCount() {
makeSureInitialized();
- return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(),
- () -> StatisticsUtil.getIcebergColumnStats(colName,
- ((IcebergExternalCatalog)
catalog).getIcebergTable(dbName, name)));
+ return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(),
getName());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
index c8ea253671d..0d53ff87b1a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
@@ -154,4 +155,20 @@ public class PaimonExternalTable extends ExternalTable {
+ getPaimonCatalogType());
}
}
+
+ @Override
+ public long fetchRowCount() {
+ makeSureInitialized();
+ try {
+ long rowCount = 0;
+ List<Split> splits =
originTable.newReadBuilder().newScan().plan().splits();
+ for (Split split : splits) {
+ rowCount += split.rowCount();
+ }
+ return rowCount;
+ } catch (Exception e) {
+ LOG.warn("Fail to collect row count for db {} table {}", dbName,
name, e);
+ }
+ return -1;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 45deff98225..119a0b30fb6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -35,11 +35,13 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.base.Preconditions;
@@ -50,6 +52,8 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
@@ -80,6 +84,9 @@ public class IcebergUtils {
}
};
static long MILLIS_TO_NANO_TIME = 1000;
+ public static final String TOTAL_RECORDS = "total-records";
+ public static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
+ public static final String TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
/**
* Create Iceberg schema from Doris ColumnDef.
@@ -442,4 +449,30 @@ public class IcebergUtils {
return builder.build();
}
+
+ /**
+ * Estimate iceberg table row count.
+ * Get the row count by adding all task file recordCount.
+ *
+ * @return estimated row count
+ */
+ public static long getIcebergRowCount(ExternalCatalog catalog, String
dbName, String tbName) {
+ try {
+ Table icebergTable = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache()
+ .getIcebergTable(catalog, dbName, tbName);
+ Snapshot snapshot = icebergTable.currentSnapshot();
+ if (snapshot == null) {
+ // empty table
+ return 0;
+ }
+ Map<String, String> summary = snapshot.summary();
+ return Long.parseLong(summary.get(TOTAL_RECORDS)) -
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+ } catch (Exception e) {
+ LOG.warn("Fail to collect row count for db {} table {}", dbName,
tbName, e);
+ }
+ return -1;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 5f79623ff4a..e6b2575ee3b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -72,15 +72,16 @@ public class IcebergMetadataCache {
return ifPresent;
}
- Table icebergTable = getIcebergTable(key, catalog,
params.getDatabase(), params.getTable());
+ Table icebergTable = getIcebergTable(catalog, params.getDatabase(),
params.getTable());
List<Snapshot> snaps = Lists.newArrayList();
Iterables.addAll(snaps, icebergTable.snapshots());
snapshotListCache.put(key, snaps);
return snaps;
}
- public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf
catalog, String dbName, String tbName)
+ public Table getIcebergTable(CatalogIf catalog, String dbName, String
tbName)
throws UserException {
+ IcebergMetadataCacheKey key =
IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName);
Table cacheTable = tableCache.getIfPresent(key);
if (cacheTable != null) {
return cacheTable;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index dbd1aab2392..e7ca3484ae0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -89,9 +89,6 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
- private static final String TOTAL_RECORDS = "total-records";
- private static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
- private static final String TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
private IcebergSource source;
private Table icebergTable;
@@ -431,8 +428,9 @@ public class IcebergScanNode extends FileQueryScanNode {
}
Map<String, String> summary = snapshot.summary();
- if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
- return Long.parseLong(summary.get(TOTAL_RECORDS)) -
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+ if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
+ return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
+ -
Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
} else {
return -1;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index af2ee141626..ac8e4f5cd98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2469,7 +2469,7 @@ public class ShowExecutor {
or estimate with file size and schema if it's not analyzed.
tableStats == null means it's not analyzed, in this case show the
estimated row count.
*/
- if (tableStats == null && tableIf instanceof HMSExternalTable) {
+ if (tableStats == null) {
resultSet =
showTableStatsStmt.constructResultSet(tableIf.getRowCount());
} else {
resultSet = showTableStatsStmt.constructResultSet(tableStats);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 7752cfcc0e8..55be0fed915 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
@@ -592,31 +591,6 @@ public class StatisticsUtil {
return parameters.containsKey(TOTAL_SIZE) ?
Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
}
- /**
- * Estimate iceberg table row count.
- * Get the row count by adding all task file recordCount.
- *
- * @param table Iceberg HMSExternalTable to estimate row count.
- * @return estimated row count
- */
- public static long getIcebergRowCount(HMSExternalTable table) {
- long rowCount = 0;
- try {
- Table icebergTable = Env.getCurrentEnv()
- .getExtMetaCacheMgr()
- .getIcebergMetadataCache()
- .getIcebergTable(table);
- TableScan tableScan = icebergTable.newScan().includeColumnStats();
- for (FileScanTask task : tableScan.planFiles()) {
- rowCount += task.file().recordCount();
- }
- return rowCount;
- } catch (Exception e) {
- LOG.warn("Fail to collect row count for db {} table {}",
table.getDbName(), table.getName(), e);
- }
- return -1;
- }
-
/**
* Estimate hive table row count : totalFileSize/estimatedRowSize
*
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
new file mode 100644
index 00000000000..88494d1b1bf
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_table_stats",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+ String catalog_name = "paimon1"
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type" = "paimon",
+ "paimon.catalog.type"="filesystem",
+ "warehouse" =
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
+ );"""
+
+ def assert_stats = { table_name, cnt ->
+ def retry = 0
+ def act = ""
+ while (retry < 10) {
+ def result = sql """ show table stats ${table_name} """
+ act = result[0][2]
+ if (act != "0") {
+ break;
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ assertEquals(act, cnt)
+ }
+
+ // select
+ sql """ switch ${catalog_name} """
+ sql """ use db1 """
+ assert_stats("all_table", "2")
+ assert_stats("auto_bucket", "2")
+ assert_stats("complex_all", "3")
+ assert_stats("complex_tab", "3")
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]