This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9135b676a9d [improvement](iceberg/paimon)support estimate row count
(#31204)
9135b676a9d is described below
commit 9135b676a9d0686841e2f5a116967951d9479ec4
Author: wuwenchi <[email protected]>
AuthorDate: Mon Feb 26 11:05:09 2024 +0800
[improvement](iceberg/paimon)support estimate row count (#31204)
Get the number of rows evaluated for iceberg and paimon.
---
.../doris/datasource/hive/HMSExternalTable.java | 2 +-
.../datasource/iceberg/IcebergExternalTable.java | 6 ++++
.../doris/datasource/iceberg/IcebergUtils.java | 34 ++++++++++++++++++++++
.../datasource/iceberg/source/IcebergScanNode.java | 8 ++---
.../datasource/paimon/PaimonExternalTable.java | 17 +++++++++++
.../doris/statistics/util/StatisticsUtil.java | 26 -----------------
6 files changed, 61 insertions(+), 32 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 0e11267829c..d095a959e90 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -321,7 +321,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
rowCount = StatisticsUtil.getHiveRowCount(this);
break;
case ICEBERG:
- rowCount = StatisticsUtil.getIcebergRowCount(this);
+ rowCount = IcebergUtils.getIcebergRowCount(getCatalog(),
getDbName(), getName());
break;
default:
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index 21f7c1d3d21..dfc78f44944 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -87,4 +87,10 @@ public class IcebergExternalTable extends ExternalTable {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
+
+ @Override
+ public long fetchRowCount() {
+ makeSureInitialized();
+ return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(),
getName());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index f66babfe03e..1102527fa3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -45,6 +45,8 @@ import org.apache.doris.thrift.TExprOpcode;
import com.google.common.collect.Lists;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
@@ -54,6 +56,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
/**
* Iceberg utils
@@ -65,6 +68,10 @@ public class IcebergUtils {
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
+ public static final String TOTAL_RECORDS = "total-records";
+ public static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
+ public static final String TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
+
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
return null;
@@ -314,4 +321,31 @@ public class IcebergUtils {
return tmpSchema;
});
}
+
+
+ /**
+ * Estimate iceberg table row count.
+ * Get the row count by adding all task file recordCount.
+ *
+ * @return estimated row count
+ */
+ public static long getIcebergRowCount(ExternalCatalog catalog, String
dbName, String tbName) {
+ try {
+ Table icebergTable = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache()
+ .getIcebergTable(catalog, dbName, tbName);
+ Snapshot snapshot = icebergTable.currentSnapshot();
+ if (snapshot == null) {
+ // empty table
+ return 0;
+ }
+ Map<String, String> summary = snapshot.summary();
+ return Long.parseLong(summary.get(TOTAL_RECORDS)) -
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+ } catch (Exception e) {
+ LOG.warn("Fail to collect row count for db {} table {}", dbName,
tbName, e);
+ }
+ return -1;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f8b72208ea4..e2564eae527 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -87,9 +87,6 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
- private static final String TOTAL_RECORDS = "total-records";
- private static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
- private static final String TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
private IcebergSource source;
private Table icebergTable;
@@ -424,8 +421,9 @@ public class IcebergScanNode extends FileQueryScanNode {
}
Map<String, String> summary = snapshot.summary();
- if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
- return Long.parseLong(summary.get(TOTAL_RECORDS)) -
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+ if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
+ return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS))
+ -
Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
} else {
return -1;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index f921fcd681a..41440c3f4cf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
@@ -163,4 +164,20 @@ public class PaimonExternalTable extends ExternalTable {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
+
+ @Override
+ public long fetchRowCount() {
+ makeSureInitialized();
+ try {
+ long rowCount = 0;
+ List<Split> splits =
originTable.newReadBuilder().newScan().plan().splits();
+ for (Split split : splits) {
+ rowCount += split.rowCount();
+ }
+ return rowCount;
+ } catch (Exception e) {
+ LOG.warn("Fail to collect row count for db {} table {}", dbName,
name, e);
+ }
+ return -1;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 8688447dcb9..8ee08d57e69 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
@@ -596,31 +595,6 @@ public class StatisticsUtil {
return parameters.containsKey(TOTAL_SIZE) ?
Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
}
- /**
- * Estimate iceberg table row count.
- * Get the row count by adding all task file recordCount.
- *
- * @param table Iceberg HMSExternalTable to estimate row count.
- * @return estimated row count
- */
- public static long getIcebergRowCount(HMSExternalTable table) {
- long rowCount = 0;
- try {
- Table icebergTable = Env.getCurrentEnv()
- .getExtMetaCacheMgr()
- .getIcebergMetadataCache()
- .getIcebergTable(table.getCatalog(), table.getDbName(),
table.getName());
- TableScan tableScan = icebergTable.newScan().includeColumnStats();
- for (FileScanTask task : tableScan.planFiles()) {
- rowCount += task.file().recordCount();
- }
- return rowCount;
- } catch (Exception e) {
- LOG.warn("Fail to collect row count for db {} table {}",
table.getDbName(), table.getName(), e);
- }
- return -1;
- }
-
/**
* Estimate hive table row count : totalFileSize/estimatedRowSize
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]