This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9bcf79178e [Improvement](statistics, multi catalog)Support iceberg
table stats collection (#21481)
9bcf79178e is described below
commit 9bcf79178ed4021bfb9c784f340c60b2c7428a8b
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Jul 7 09:18:37 2023 +0800
[Improvement](statistics, multi catalog)Support iceberg table stats
collection (#21481)
Fetch iceberg table stats automatically while querying a table.
Collect accurate statistics for Iceberg table by running analyze sql in
Doris (remove collect by meta option).
---
.../main/java/org/apache/doris/common/Config.java | 7 -
.../doris/catalog/external/HMSExternalTable.java | 25 +-
.../catalog/external/IcebergExternalTable.java | 10 +
.../apache/doris/external/hive/util/HiveUtil.java | 2 +-
.../apache/doris/statistics/HMSAnalysisTask.java | 239 ++++++++++++-
.../apache/doris/statistics/HiveAnalysisTask.java | 370 ---------------------
.../doris/statistics/IcebergAnalysisTask.java | 121 -------
.../doris/statistics/util/StatisticsUtil.java | 49 +++
8 files changed, 298 insertions(+), 525 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index e4a61788ec..c683db9bc5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1742,13 +1742,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = false)
public static boolean use_fuzzy_session_variable = false;
- /**
- * Collect external table statistic info by running sql when set to true.
- * Otherwise, use external catalog metadata.
- */
- @ConfField(mutable = true)
- public static boolean collect_external_table_stats_by_sql = true;
-
/**
* Max num of same name meta informatntion in catalog recycle bin.
* Default is 3.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 1aee1803ae..b985e4cddd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -29,8 +29,7 @@ import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
-import org.apache.doris.statistics.HiveAnalysisTask;
-import org.apache.doris.statistics.IcebergAnalysisTask;
+import org.apache.doris.statistics.HMSAnalysisTask;
import org.apache.doris.statistics.TableStatistic;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
@@ -322,14 +321,7 @@ public class HMSExternalTable extends ExternalTable {
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
- switch (dlaType) {
- case HIVE:
- return new HiveAnalysisTask(info);
- case ICEBERG:
- return new IcebergAnalysisTask(info);
- default:
- throw new IllegalArgumentException("Analysis job for dlaType "
+ dlaType + " not supported.");
- }
+ return new HMSAnalysisTask(info);
}
public String getViewText() {
@@ -473,6 +465,19 @@ public class HMSExternalTable extends ExternalTable {
@Override
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+ makeSureInitialized();
+ switch (dlaType) {
+ case HIVE:
+ return getHiveColumnStats(colName);
+ case ICEBERG:
+ return StatisticsUtil.getIcebergColumnStats(colName,
HiveMetaStoreClientHelper.getIcebergTable(this));
+ default:
+ LOG.warn("get column stats for dlaType {} is not supported.",
dlaType);
+ }
+ return Optional.empty();
+ }
+
+ private Optional<ColumnStatistic> getHiveColumnStats(String colName) {
List<ColumnStatisticsObj> tableStats =
getHiveTableColumnStats(Lists.newArrayList(colName));
if (tableStats == null || tableStats.isEmpty()) {
LOG.debug(String.format("No table stats found in Hive metastore
for column %s in table %s.",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index bf29d93eed..c2be8b90a0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
@@ -33,6 +35,7 @@ import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@@ -134,4 +137,11 @@ public class IcebergExternalTable extends ExternalTable {
return tTableDescriptor;
}
}
+
+ @Override
+ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+ makeSureInitialized();
+ return StatisticsUtil.getIcebergColumnStats(colName,
+ ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name));
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index d42d8c3d7d..704d0fadf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -197,7 +197,7 @@ public final class HiveUtil {
method = clazz.getDeclaredMethod("isSplitable",
FileSystem.class, Path.class);
break;
} catch (NoSuchMethodException ignored) {
- LOG.warn("Class {} doesn't contain isSplitable method.",
clazz);
+ LOG.debug("Class {} doesn't contain isSplitable method.",
clazz);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index b5bee03f0b..a7b45c13cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -17,41 +17,248 @@
package org.apache.doris.statistics;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.InternalQueryResult;
+import org.apache.doris.statistics.util.StatisticsUtil;
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class HMSAnalysisTask extends BaseAnalysisTask {
+ private static final Logger LOG =
LogManager.getLogger(HMSAnalysisTask.class);
+
+ public static final String TOTAL_SIZE = "totalSize";
+ public static final String NUM_ROWS = "numRows";
+ public static final String NUM_FILES = "numFiles";
+ public static final String TIMESTAMP = "transient_lastDdlTime";
+
+ private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
+ + "${internalDB}.${columnStatTbl}"
+ + " SELECT "
+ + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ + "${catalogId} AS catalog_id, "
+ + "${dbId} AS db_id, "
+ + "${tblId} AS tbl_id, "
+ + "${idxId} AS idx_id, "
+ + "'${colId}' AS col_id, "
+ + "${partId} AS part_id, "
+ + "COUNT(1) AS row_count, "
+ + "NDV(`${colName}`) AS ndv, "
+ + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
+ + "MIN(`${colName}`) AS min, "
+ + "MAX(`${colName}`) AS max, "
+ + "${dataSizeFunction} AS data_size, "
+ + "NOW() "
+ + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
- protected HMSExternalTable table;
+ private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT
COUNT(1) as rowCount "
+ + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+ private final boolean isTableLevelTask;
+ private final boolean isSamplingPartition;
+ private final boolean isPartitionOnly;
+ private final Set<String> partitionNames;
+ private HMSExternalTable table;
public HMSAnalysisTask(AnalysisInfo info) {
super(info);
+ isTableLevelTask = info.externalTableLevelTask;
+ isSamplingPartition = info.samplingPartition;
+ isPartitionOnly = info.partitionOnly;
+ partitionNames = info.partitionNames;
table = (HMSExternalTable) tbl;
}
+ public void execute() throws Exception {
+ if (isTableLevelTask) {
+ getTableStats();
+ } else {
+ getTableColumnStats();
+ }
+ }
+
/**
- * Collect the column level stats for external table through metadata.
+ * Get table row count and insert the result to
__internal_schema.table_statistics
*/
- protected void getStatsByMeta() throws Exception {
- throw new NotImplementedException("Code is not implemented");
+ private void getTableStats() throws Exception {
+ // Get table level information. An example sql for table stats:
+ // INSERT INTO __internal_schema.table_statistics VALUES
+ // ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658,
NOW())
+ Map<String, String> parameters =
table.getRemoteTable().getParameters();
+ if (isPartitionOnly) {
+ for (String partId : partitionNames) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
+ sb.append(" where ");
+ String[] splits = partId.split("/");
+ for (int i = 0; i < splits.length; i++) {
+ String value = splits[i].split("=")[1];
+ splits[i] = splits[i].replace(value, "\'" + value + "\'");
+ }
+ sb.append(StringUtils.join(splits, " and "));
+ Map<String, String> params = buildTableStatsParams(partId);
+ setParameterData(parameters, params);
+ List<InternalQueryResult.ResultRow> columnResult =
+ StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
+ .replace(sb.toString()));
+ String rowCount =
columnResult.get(0).getColumnValue("rowCount");
+ params.put("rowCount", rowCount);
+ StatisticsRepository.persistTableStats(params);
+ }
+ } else {
+ Map<String, String> params = buildTableStatsParams("NULL");
+ List<InternalQueryResult.ResultRow> columnResult =
+ StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
+ .replace(ANALYZE_TABLE_COUNT_TEMPLATE));
+ String rowCount = columnResult.get(0).getColumnValue("rowCount");
+ params.put("rowCount", rowCount);
+ StatisticsRepository.persistTableStats(params);
+ }
}
/**
- * Collect the stats for external table through sql.
- * @return ColumnStatistics
+ * Get column statistics and insert the result to
__internal_schema.column_statistics
*/
- protected void getStatsBySql() throws Exception {
- throw new NotImplementedException("getColumnStatsBySql is not
implemented");
+ private void getTableColumnStats() throws Exception {
+ // An example sql for a column stats:
+ // INSERT INTO __internal_schema.column_statistics
+ // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
+ // 13002 AS catalog_id,
+ // 13038 AS db_id,
+ // 13055 AS tbl_id,
+ // -1 AS idx_id,
+ // 'r_regionkey' AS col_id,
+ // 'NULL' AS part_id,
+ // COUNT(1) AS row_count,
+ // NDV(`r_regionkey`) AS ndv,
+ // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS
null_count,
+ // MIN(`r_regionkey`) AS min,
+ // MAX(`r_regionkey`) AS max,
+ // 0 AS data_size,
+ // NOW() FROM `hive`.`tpch100`.`region`
+ if (isPartitionOnly) {
+ for (String partId : partitionNames) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+ sb.append(" where ");
+ String[] splits = partId.split("/");
+ for (int i = 0; i < splits.length; i++) {
+ String value = splits[i].split("=")[1];
+ splits[i] = splits[i].replace(value, "\'" + value + "\'");
+ }
+ sb.append(StringUtils.join(splits, " and "));
+ Map<String, String> params = buildTableStatsParams(partId);
+ params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+ params.put("columnStatTbl",
StatisticConstants.STATISTIC_TBL_NAME);
+ params.put("colName", col.getName());
+ params.put("colId", info.colName);
+ params.put("dataSizeFunction", getDataSizeFunction(col));
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
+ String sql = stringSubstitutor.replace(sb.toString());
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext()) {
+
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ this.stmtExecutor = new StmtExecutor(r.connectContext,
sql);
+ this.stmtExecutor.execute();
+ }
+ }
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+ if (isSamplingPartition) {
+ sb.append(" where 1=1 ");
+ String[] splitExample =
partitionNames.stream().findFirst().get().split("/");
+ int parts = splitExample.length;
+ List<String> partNames = new ArrayList<>();
+ for (String split : splitExample) {
+ partNames.add(split.split("=")[0]);
+ }
+ List<List<String>> valueLists = new ArrayList<>();
+ for (int i = 0; i < parts; i++) {
+ valueLists.add(new ArrayList<>());
+ }
+ for (String partId : partitionNames) {
+ String[] partIds = partId.split("/");
+ for (int i = 0; i < partIds.length; i++) {
+ valueLists.get(i).add("\'" + partIds[i].split("=")[1]
+ "\'");
+ }
+ }
+ for (int i = 0; i < parts; i++) {
+ sb.append(" and ");
+ sb.append(partNames.get(i));
+ sb.append(" in (");
+ sb.append(StringUtils.join(valueLists.get(i), ","));
+ sb.append(") ");
+ }
+ }
+ Map<String, String> params = buildTableStatsParams("NULL");
+ params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+ params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+ params.put("colName", col.getName());
+ params.put("colId", info.colName);
+ params.put("dataSizeFunction", getDataSizeFunction(col));
+ StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
+ String sql = stringSubstitutor.replace(sb.toString());
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext()) {
+
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ this.stmtExecutor.execute();
+ }
+
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1,
col.getName());
+ }
}
- @Override
- public void execute() throws Exception {
- if (Config.collect_external_table_stats_by_sql) {
- getStatsBySql();
- } else {
- getStatsByMeta();
+ private Map<String, String> buildTableStatsParams(String partId) {
+ Map<String, String> commonParams = new HashMap<>();
+ String id = StatisticsUtil.constructId(tbl.getId(), -1);
+ if (!partId.equals("NULL")) {
+ id = StatisticsUtil.constructId(id, partId);
+ }
+ commonParams.put("id", id);
+ commonParams.put("catalogId", String.valueOf(catalog.getId()));
+ commonParams.put("dbId", String.valueOf(db.getId()));
+ commonParams.put("tblId", String.valueOf(tbl.getId()));
+ commonParams.put("indexId", "-1");
+ commonParams.put("idxId", "-1");
+ commonParams.put("partId", "\'" + partId + "\'");
+ commonParams.put("catalogName", catalog.getName());
+ commonParams.put("dbName", db.getFullName());
+ commonParams.put("tblName", tbl.getName());
+ if (col != null) {
+ commonParams.put("type", col.getType().toString());
+ }
+ commonParams.put("lastAnalyzeTimeInMs",
String.valueOf(System.currentTimeMillis()));
+ return commonParams;
+ }
+
+ private void setParameterData(Map<String, String> parameters, Map<String,
String> params) {
+ String numRows = "";
+ String timestamp = "";
+ if (parameters.containsKey(NUM_ROWS)) {
+ numRows = parameters.get(NUM_ROWS);
+ }
+ if (parameters.containsKey(TIMESTAMP)) {
+ timestamp = parameters.get(TIMESTAMP);
}
+ params.put("numRows", numRows);
+ params.put("rowCount", numRows);
+ params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
+
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
+ ZoneId.systemDefault())));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
deleted file mode 100644
index 8ae74206de..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
+++ /dev/null
@@ -1,370 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.util.InternalQueryResult;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.text.StringSubstitutor;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveAnalysisTask extends HMSAnalysisTask {
- private static final Logger LOG =
LogManager.getLogger(HiveAnalysisTask.class);
-
- public static final String TOTAL_SIZE = "totalSize";
- public static final String NUM_ROWS = "numRows";
- public static final String NUM_FILES = "numFiles";
- public static final String TIMESTAMP = "transient_lastDdlTime";
-
- private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT "
- + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
- + "${catalogId} AS catalog_id, "
- + "${dbId} AS db_id, "
- + "${tblId} AS tbl_id, "
- + "${idxId} AS idx_id, "
- + "'${colId}' AS col_id, "
- + "${partId} AS part_id, "
- + "COUNT(1) AS row_count, "
- + "NDV(`${colName}`) AS ndv, "
- + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
- + "MIN(`${colName}`) AS min, "
- + "MAX(`${colName}`) AS max, "
- + "${dataSizeFunction} AS data_size, "
- + "NOW() "
- + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
-
- private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT
COUNT(1) as rowCount "
- + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
-
- private final boolean isTableLevelTask;
- private final boolean isSamplingPartition;
- private final boolean isPartitionOnly;
- private final Set<String> partitionNames;
-
- public HiveAnalysisTask(AnalysisInfo info) {
- super(info);
- isTableLevelTask = info.externalTableLevelTask;
- isSamplingPartition = info.samplingPartition;
- isPartitionOnly = info.partitionOnly;
- partitionNames = info.partitionNames;
- }
-
- private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT
INTO "
- + "${internalDB}.${columnStatTbl}"
- + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', NULL, "
- + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize},
'${update_time}')";
-
- private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE =
"INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', '${partId}', "
- + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize},
'${update_time}')";
-
- private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'', NULL, "
- + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
-
- /**
- * Collect the stats for external table through sql.
- */
- @Override
- protected void getStatsBySql() throws Exception {
- if (isTableLevelTask) {
- getTableStatsBySql();
- } else {
- getTableColumnStatsBySql();
- }
- }
-
- /**
- * Get table row count and insert the result to
__internal_schema.table_statistics
- */
- private void getTableStatsBySql() throws Exception {
- // Get table level information. An example sql for table stats:
- // INSERT INTO __internal_schema.table_statistics VALUES
- // ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658,
NOW())
- Map<String, String> parameters =
table.getRemoteTable().getParameters();
- if (isPartitionOnly) {
- for (String partId : partitionNames) {
- StringBuilder sb = new StringBuilder();
- sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
- sb.append(" where ");
- String[] splits = partId.split("/");
- for (int i = 0; i < splits.length; i++) {
- String value = splits[i].split("=")[1];
- splits[i] = splits[i].replace(value, "\'" + value + "\'");
- }
- sb.append(StringUtils.join(splits, " and "));
- Map<String, String> params = buildTableStatsParams(partId);
- setParameterData(parameters, params);
- List<InternalQueryResult.ResultRow> columnResult =
- StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
- .replace(sb.toString()));
- String rowCount =
columnResult.get(0).getColumnValue("rowCount");
- params.put("rowCount", rowCount);
- StatisticsRepository.persistTableStats(params);
- }
- } else {
- Map<String, String> params = buildTableStatsParams("NULL");
- List<InternalQueryResult.ResultRow> columnResult =
- StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
- .replace(ANALYZE_TABLE_COUNT_TEMPLATE));
- String rowCount = columnResult.get(0).getColumnValue("rowCount");
- params.put("rowCount", rowCount);
- StatisticsRepository.persistTableStats(params);
- }
- }
-
- /**
- * Get column statistics and insert the result to
__internal_schema.column_statistics
- */
- private void getTableColumnStatsBySql() throws Exception {
- // An example sql for a column stats:
- // INSERT INTO __internal_schema.column_statistics
- // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
- // 13002 AS catalog_id,
- // 13038 AS db_id,
- // 13055 AS tbl_id,
- // -1 AS idx_id,
- // 'r_regionkey' AS col_id,
- // 'NULL' AS part_id,
- // COUNT(1) AS row_count,
- // NDV(`r_regionkey`) AS ndv,
- // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS
null_count,
- // MIN(`r_regionkey`) AS min,
- // MAX(`r_regionkey`) AS max,
- // 0 AS data_size,
- // NOW() FROM `hive`.`tpch100`.`region`
- if (isPartitionOnly) {
- for (String partId : partitionNames) {
- StringBuilder sb = new StringBuilder();
- sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
- sb.append(" where ");
- String[] splits = partId.split("/");
- for (int i = 0; i < splits.length; i++) {
- String value = splits[i].split("=")[1];
- splits[i] = splits[i].replace(value, "\'" + value + "\'");
- }
- sb.append(StringUtils.join(splits, " and "));
- Map<String, String> params = buildTableStatsParams(partId);
- params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
- params.put("columnStatTbl",
StatisticConstants.STATISTIC_TBL_NAME);
- params.put("colName", col.getName());
- params.put("colId", info.colName);
- params.put("dataSizeFunction", getDataSizeFunction(col));
- StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
- String sql = stringSubstitutor.replace(sb.toString());
- try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext()) {
-
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext,
sql);
- this.stmtExecutor.execute();
- }
- }
- } else {
- StringBuilder sb = new StringBuilder();
- sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
- if (isSamplingPartition) {
- sb.append(" where 1=1 ");
- String[] splitExample =
partitionNames.stream().findFirst().get().split("/");
- int parts = splitExample.length;
- List<String> partNames = new ArrayList<>();
- for (String split : splitExample) {
- partNames.add(split.split("=")[0]);
- }
- List<List<String>> valueLists = new ArrayList<>();
- for (int i = 0; i < parts; i++) {
- valueLists.add(new ArrayList<>());
- }
- for (String partId : partitionNames) {
- String[] partIds = partId.split("/");
- for (int i = 0; i < partIds.length; i++) {
- valueLists.get(i).add("\'" + partIds[i].split("=")[1]
+ "\'");
- }
- }
- for (int i = 0; i < parts; i++) {
- sb.append(" and ");
- sb.append(partNames.get(i));
- sb.append(" in (");
- sb.append(StringUtils.join(valueLists.get(i), ","));
- sb.append(") ");
- }
- }
- Map<String, String> params = buildTableStatsParams("NULL");
- params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
- params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
- params.put("colName", col.getName());
- params.put("colId", info.colName);
- params.put("dataSizeFunction", getDataSizeFunction(col));
- StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
- String sql = stringSubstitutor.replace(sb.toString());
- try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext()) {
-
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
- this.stmtExecutor.execute();
- }
-
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1,
col.getName());
- }
- }
-
- private Map<String, String> buildTableStatsParams(String partId) {
- Map<String, String> commonParams = new HashMap<>();
- String id = StatisticsUtil.constructId(tbl.getId(), -1);
- if (!partId.equals("NULL")) {
- id = StatisticsUtil.constructId(id, partId);
- }
- commonParams.put("id", id);
- commonParams.put("catalogId", String.valueOf(catalog.getId()));
- commonParams.put("dbId", String.valueOf(db.getId()));
- commonParams.put("tblId", String.valueOf(tbl.getId()));
- commonParams.put("indexId", "-1");
- commonParams.put("idxId", "-1");
- commonParams.put("partId", "\'" + partId + "\'");
- commonParams.put("catalogName", catalog.getName());
- commonParams.put("dbName", db.getFullName());
- commonParams.put("tblName", tbl.getName());
- if (col != null) {
- commonParams.put("type", col.getType().toString());
- }
- commonParams.put("lastAnalyzeTimeInMs",
String.valueOf(System.currentTimeMillis()));
- return commonParams;
- }
-
- @Override
- protected void getStatsByMeta() throws Exception {
- // To be removed.
- }
-
- private void getStatData(ColumnStatisticsData data, Map<String, String>
params, long rowCount) {
- long ndv = 0;
- long nulls = 0;
- String min = "";
- String max = "";
- long colSize = 0;
- if (!data.isSetStringStats()) {
- colSize = rowCount * col.getType().getSlotSize();
- }
- // Collect ndv, nulls, min and max for different data type.
- if (data.isSetLongStats()) {
- LongColumnStatsData longStats = data.getLongStats();
- ndv = longStats.getNumDVs();
- nulls = longStats.getNumNulls();
- min = String.valueOf(longStats.getLowValue());
- max = String.valueOf(longStats.getHighValue());
- } else if (data.isSetStringStats()) {
- StringColumnStatsData stringStats = data.getStringStats();
- ndv = stringStats.getNumDVs();
- nulls = stringStats.getNumNulls();
- double avgColLen = stringStats.getAvgColLen();
- colSize = Math.round(avgColLen * rowCount);
- } else if (data.isSetDecimalStats()) {
- DecimalColumnStatsData decimalStats = data.getDecimalStats();
- ndv = decimalStats.getNumDVs();
- nulls = decimalStats.getNumNulls();
- if (decimalStats.isSetLowValue()) {
- Decimal lowValue = decimalStats.getLowValue();
- if (lowValue != null) {
- BigDecimal lowDecimal = new BigDecimal(new
BigInteger(lowValue.getUnscaled()), lowValue.getScale());
- min = lowDecimal.toString();
- }
- }
- if (decimalStats.isSetHighValue()) {
- Decimal highValue = decimalStats.getHighValue();
- if (highValue != null) {
- BigDecimal highDecimal = new BigDecimal(
- new BigInteger(highValue.getUnscaled()),
highValue.getScale());
- max = highDecimal.toString();
- }
- }
- } else if (data.isSetDoubleStats()) {
- DoubleColumnStatsData doubleStats = data.getDoubleStats();
- ndv = doubleStats.getNumDVs();
- nulls = doubleStats.getNumNulls();
- min = String.valueOf(doubleStats.getLowValue());
- max = String.valueOf(doubleStats.getHighValue());
- } else if (data.isSetDateStats()) {
- DateColumnStatsData dateStats = data.getDateStats();
- ndv = dateStats.getNumDVs();
- nulls = dateStats.getNumNulls();
- if (dateStats.isSetLowValue()) {
- org.apache.hadoop.hive.metastore.api.Date lowValue =
dateStats.getLowValue();
- if (lowValue != null) {
- LocalDate lowDate =
LocalDate.ofEpochDay(lowValue.getDaysSinceEpoch());
- min = lowDate.toString();
- }
- }
- if (dateStats.isSetHighValue()) {
- org.apache.hadoop.hive.metastore.api.Date highValue =
dateStats.getHighValue();
- if (highValue != null) {
- LocalDate highDate =
LocalDate.ofEpochDay(highValue.getDaysSinceEpoch());
- max = highDate.toString();
- }
- }
- } else {
- throw new RuntimeException("Not supported data type.");
- }
- params.put("ndv", String.valueOf(ndv));
- params.put("nulls", String.valueOf(nulls));
- params.put("min", min);
- params.put("max", max);
- params.put("dataSize", String.valueOf(colSize));
- }
-
- private void setParameterData(Map<String, String> parameters, Map<String,
String> params) {
- String numRows = "";
- String timestamp = "";
- if (parameters.containsKey(NUM_ROWS)) {
- numRows = parameters.get(NUM_ROWS);
- }
- if (parameters.containsKey(TIMESTAMP)) {
- timestamp = parameters.get(TIMESTAMP);
- }
- params.put("numRows", numRows);
- params.put("rowCount", numRows);
- params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
-
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
- ZoneId.systemDefault())));
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
deleted file mode 100644
index 105ef758f0..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
+++ /dev/null
@@ -1,121 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.commons.text.StringSubstitutor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.Map;
-
-public class IcebergAnalysisTask extends HMSAnalysisTask {
-
- private long numRows = 0;
- private long dataSize = 0;
- private long numNulls = 0;
-
- public IcebergAnalysisTask(AnalysisInfo info) {
- super(info);
- }
-
- private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1',
'${colId}', NULL, "
- + "${numRows}, 0, ${nulls}, '0', '0', ${dataSize},
'${update_time}')";
-
-
- @Override
- protected void getStatsByMeta() throws Exception {
- Table icebergTable = getIcebergTable();
- TableScan tableScan = icebergTable.newScan().includeColumnStats();
- for (FileScanTask task : tableScan.planFiles()) {
- processDataFile(task.file(), task.spec());
- }
- updateStats();
- }
-
- private Table getIcebergTable() {
- org.apache.iceberg.hive.HiveCatalog hiveCatalog = new
org.apache.iceberg.hive.HiveCatalog();
- Configuration conf = new HdfsConfiguration();
- for (Map.Entry<String, String> entry :
table.getHadoopProperties().entrySet()) {
- conf.set(entry.getKey(), entry.getValue());
- }
- hiveCatalog.setConf(conf);
- Map<String, String> catalogProperties = new HashMap<>();
- catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS,
table.getMetastoreUri());
- catalogProperties.put("uri", table.getMetastoreUri());
- hiveCatalog.initialize("hive", catalogProperties);
- return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(),
table.getName()));
- }
-
- private void processDataFile(DataFile dataFile, PartitionSpec
partitionSpec) {
- int colId = -1;
- for (Types.NestedField column : partitionSpec.schema().columns()) {
- if (column.name().equals(col.getName())) {
- colId = column.fieldId();
- break;
- }
- }
- if (colId == -1) {
- throw new RuntimeException(String.format("Column %s not exist.",
col.getName()));
- }
- dataSize += dataFile.columnSizes().get(colId);
- numRows += dataFile.recordCount();
- numNulls += dataFile.nullValueCounts().get(colId);
- }
-
- private void updateStats() throws Exception {
- Map<String, String> params = new HashMap<>();
- params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
- params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
- params.put("id", tbl.getId() + "-" + col.getName());
- params.put("catalogId", String.valueOf(catalog.getId()));
- params.put("dbId", String.valueOf(db.getId()));
- params.put("tblId", String.valueOf(tbl.getId()));
- params.put("colId", String.valueOf(col.getName()));
- params.put("numRows", String.valueOf(numRows));
- params.put("nulls", String.valueOf(numNulls));
- params.put("dataSize", String.valueOf(dataSize));
- params.put("update_time",
TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
-
- // Update table level stats info of this column.
- StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
- String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE);
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
- r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
- this.stmtExecutor.execute();
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 0bdf736aab..6e773fc430 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -60,6 +60,7 @@ import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
@@ -71,9 +72,12 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -594,4 +598,49 @@ public class StatisticsUtil {
}
return totalSize / estimatedRowSize;
}
+
+ /**
+ * Get Iceberg column statistics.
+ * @param colName
+ * @param table Iceberg table.
+ * @return Optional Column statistic for the given column.
+ */
+ public static Optional<ColumnStatistic> getIcebergColumnStats(String
colName, org.apache.iceberg.Table table) {
+ TableScan tableScan = table.newScan().includeColumnStats();
+ ColumnStatisticBuilder columnStatisticBuilder = new
ColumnStatisticBuilder();
+ columnStatisticBuilder.setCount(0);
+ columnStatisticBuilder.setMaxValue(Double.MAX_VALUE);
+ columnStatisticBuilder.setMinValue(Double.MIN_VALUE);
+ columnStatisticBuilder.setDataSize(0);
+ columnStatisticBuilder.setAvgSizeByte(0);
+ columnStatisticBuilder.setNumNulls(0);
+ for (FileScanTask task : tableScan.planFiles()) {
+ processDataFile(task.file(), task.spec(), colName,
columnStatisticBuilder);
+ }
+ if (columnStatisticBuilder.getCount() > 0) {
+
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
+ / columnStatisticBuilder.getCount());
+ }
+ return Optional.of(columnStatisticBuilder.build());
+ }
+
+ private static void processDataFile(DataFile dataFile, PartitionSpec
partitionSpec,
+ String colName, ColumnStatisticBuilder
columnStatisticBuilder) {
+ int colId = -1;
+ for (Types.NestedField column : partitionSpec.schema().columns()) {
+ if (column.name().equals(colName)) {
+ colId = column.fieldId();
+ break;
+ }
+ }
+ if (colId == -1) {
+ throw new RuntimeException(String.format("Column %s not exist.",
colName));
+ }
+ // Update the data size, count and num of nulls in
columnStatisticBuilder.
+ // TODO: Get min max value.
+
columnStatisticBuilder.setDataSize(columnStatisticBuilder.getDataSize() +
dataFile.columnSizes().get(colId));
+ columnStatisticBuilder.setCount(columnStatisticBuilder.getCount() +
dataFile.recordCount());
+ columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls()
+ + dataFile.nullValueCounts().get(colId));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]