This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6e021dcef9e [improvement](statistics)External table getRowCount return
-1 when row count is not available or row count is 0. (#43009) (#43771)
6e021dcef9e is described below
commit 6e021dcef9e335f040cb6f428efb606c5801134a
Author: James <[email protected]>
AuthorDate: Tue Nov 12 21:39:30 2024 +0800
[improvement](statistics)External table getRowCount return -1 when row
count is not available or row count is 0. (#43009) (#43771)
backport: https://github.com/apache/doris/pull/43009
---
.../java/org/apache/doris/catalog/OlapTable.java | 8 +-
.../main/java/org/apache/doris/catalog/Table.java | 2 +-
.../java/org/apache/doris/catalog/TableIf.java | 2 +
.../doris/datasource/ExternalRowCountCache.java | 6 +-
.../org/apache/doris/datasource/ExternalTable.java | 4 +-
.../doris/datasource/hive/HMSExternalTable.java | 18 ++--
.../datasource/iceberg/IcebergExternalTable.java | 3 +-
.../doris/datasource/iceberg/IcebergUtils.java | 5 +-
.../datasource/paimon/PaimonExternalTable.java | 6 +-
.../doris/statistics/StatisticsAutoCollector.java | 2 +-
.../doris/statistics/util/StatisticsUtil.java | 8 +-
.../datasource/ExternalRowCountCacheTest.java | 102 +++++++++++++++++++++
12 files changed, 135 insertions(+), 31 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 37e5f265bd6..7ddc51224b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -131,8 +131,6 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
WAITING_STABLE
}
- public static long ROW_COUNT_BEFORE_REPORT = -1;
-
@SerializedName(value = "state")
private volatile OlapTableState state;
@@ -1519,12 +1517,12 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (index == null) {
LOG.warn("Index {} not exist in partition {}, table {}, {}",
indexId, entry.getValue().getName(), id, name);
- return ROW_COUNT_BEFORE_REPORT;
+ return UNKNOWN_ROW_COUNT;
}
if (strict && !index.getRowCountReported()) {
- return ROW_COUNT_BEFORE_REPORT;
+ return UNKNOWN_ROW_COUNT;
}
- rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount();
+ rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 :
index.getRowCount();
}
return rowCount;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 862d6c1878e..8d648df3356 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -586,7 +586,7 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
@Override
public long fetchRowCount() {
- return 0;
+ return UNKNOWN_ROW_COUNT;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index d42a32ef8d2..8f9594e82c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -56,6 +56,8 @@ import java.util.stream.Collectors;
public interface TableIf {
Logger LOG = LogManager.getLogger(TableIf.class);
+ long UNKNOWN_ROW_COUNT = -1;
+
default void readLock() {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
index faf01a49384..0826187317a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
@@ -94,7 +94,7 @@ public class ExternalRowCountCache {
}
/**
- * Get cached row count for the given table. Return 0 if cached not loaded
or table not exists.
+ * Get cached row count for the given table. Return -1 if cached not
loaded or table not exists.
* Cached will be loaded async.
* @param catalogId
* @param dbId
@@ -106,13 +106,13 @@ public class ExternalRowCountCache {
try {
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
if (f.isDone()) {
- return f.get().orElse(0L);
+ return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT);
}
LOG.info("Row count for table {}.{}.{} is still processing.",
catalogId, dbId, tableId);
} catch (Exception e) {
LOG.warn("Unexpected exception while returning row count", e);
}
- return 0;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 71ac00e48e6..590a4cbe046 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -200,7 +200,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(),
dbName, name, e);
- return 0;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
// All external table should get external row count from cache.
return
Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(),
dbId, id);
@@ -226,7 +226,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
* This is called by ExternalRowCountCache to load row count cache.
*/
public long fetchRowCount() {
- return 0;
+ return UNKNOWN_ROW_COUNT;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 5f2c8cbddf3..5df44fda476 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -344,7 +344,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
private long getRowCountFromExternalSource() {
- long rowCount;
+ long rowCount = UNKNOWN_ROW_COUNT;
switch (dlaType) {
case HIVE:
rowCount = StatisticsUtil.getHiveRowCount(this);
@@ -358,7 +358,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
rowCount = -1;
}
- return rowCount;
+ return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}
@Override
@@ -532,7 +532,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
// Get row count from hive metastore property.
long rowCount = getRowCountFromExternalSource();
// Only hive table supports estimate row count by listing file.
- if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) {
+ if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) {
LOG.info("Will estimate row count for table {} from file list.",
name);
rowCount = getRowCountFromFileList();
}
@@ -838,11 +838,11 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
*/
private long getRowCountFromFileList() {
if (!GlobalVariable.enable_get_row_count_from_file_list) {
- return -1;
+ return UNKNOWN_ROW_COUNT;
}
if (isView()) {
- LOG.info("Table {} is view, return 0.", name);
- return 0;
+ LOG.info("Table {} is view, return -1.", name);
+ return UNKNOWN_ROW_COUNT;
}
HiveMetaStoreCache.HivePartitionValues partitionValues =
getAllPartitionValues();
@@ -869,8 +869,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
- LOG.warn("Table {} estimated size is 0, return 0.", name);
- return 0;
+ LOG.warn("Table {} estimated size is 0, return -1.", name);
+ return UNKNOWN_ROW_COUNT;
}
int totalPartitionSize = partitionValues == null ? 1 :
partitionValues.getIdToPartitionItem().size();
@@ -882,7 +882,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
long rows = totalSize / estimatedRowSize;
LOG.info("Table {} rows {}, total size is {}, estimatedRowSize is {}",
name, rows, totalSize, estimatedRowSize);
- return rows;
+ return rows > 0 ? rows : UNKNOWN_ROW_COUNT;
}
// Get all partition values from cache.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index d4361a47797..feded88ea32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -83,7 +83,8 @@ public class IcebergExternalTable extends ExternalTable {
@Override
public long fetchRowCount() {
makeSureInitialized();
- return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(),
getName());
+ long rowCount = IcebergUtils.getIcebergRowCount(getCatalog(),
getDbName(), getName());
+ return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}
public Table getIcebergTable() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 62d260dacaf..58519d92636 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
@@ -604,7 +605,7 @@ public class IcebergUtils {
if (snapshot == null) {
LOG.info("Iceberg table {}.{}.{} is empty, return row count
0.", catalog.getName(), dbName, tbName);
// empty table
- return 0;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
Map<String, String> summary = snapshot.summary();
long rows = Long.parseLong(summary.get(TOTAL_RECORDS))
@@ -614,7 +615,7 @@ public class IcebergUtils {
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName,
tbName, e);
}
- return -1;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 618c51caea1..196b01efe2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -194,16 +194,16 @@ public class PaimonExternalTable extends ExternalTable {
Table paimonTable = schemaCacheValue.map(value ->
((PaimonSchemaCacheValue) value).getPaimonTable())
.orElse(null);
if (paimonTable == null) {
- return -1;
+ return UNKNOWN_ROW_COUNT;
}
List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
- return rowCount;
+ return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName,
name, e);
}
- return -1;
+ return UNKNOWN_ROW_COUNT;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index a7c2fc6365b..62d3a5b2946 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -181,7 +181,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
if (table instanceof OlapTable &&
analysisMethod.equals(AnalysisMethod.SAMPLE)) {
OlapTable ot = (OlapTable) table;
- if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) ==
OlapTable.ROW_COUNT_BEFORE_REPORT) {
+ if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) ==
TableIf.UNKNOWN_ROW_COUNT) {
LOG.info("Table {} row count is not fully reported, skip auto
analyzing this time.", ot.getName());
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 775138480d9..288eb88e95f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -560,19 +560,19 @@ public class StatisticsUtil {
public static long getHiveRowCount(HMSExternalTable table) {
Map<String, String> parameters =
table.getRemoteTable().getParameters();
if (parameters == null) {
- return -1;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters contains row count, simply get and return it.
if (parameters.containsKey(NUM_ROWS)) {
long rows = Long.parseLong(parameters.get(NUM_ROWS));
// Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need
to check TOTAL_SIZE if NUM_ROWS is 0.
- if (rows != 0) {
+ if (rows > 0) {
LOG.info("Get row count {} for hive table {} in table
parameters.", rows, table.getName());
return rows;
}
}
if (!parameters.containsKey(TOTAL_SIZE)) {
- return -1;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters doesn't contain row count but contain total size.
Estimate row count : totalSize/rowSize
long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE));
@@ -582,7 +582,7 @@ public class StatisticsUtil {
}
if (estimatedRowSize == 0) {
LOG.warn("Hive table {} estimated row size is invalid {}",
table.getName(), estimatedRowSize);
- return -1;
+ return TableIf.UNKNOWN_ROW_COUNT;
}
long rows = totalSize / estimatedRowSize;
LOG.info("Get row count {} for hive table {} by total size {} and row
size {}",
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
new file mode 100644
index 00000000000..81605f93dcd
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ThreadPoolManager;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExternalRowCountCacheTest {
+ @Test
+ public void testLoadWithException() throws Exception {
+ ThreadPoolExecutor executor =
ThreadPoolManager.newDaemonFixedThreadPool(
+ 1, Integer.MAX_VALUE, "TEST", true);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
+ @Mock
+ protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey
rowCountKey) {
+ counter.incrementAndGet();
+ return null;
+ }
+ };
+ ExternalRowCountCache cache = new ExternalRowCountCache(executor);
+ long cachedRowCount = cache.getCachedRowCount(1, 1, 1);
+ Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
+ for (int i = 0; i < 60; i++) {
+ if (counter.get() == 1) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertEquals(1, counter.get());
+
+ new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
+ @Mock
+ protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey
rowCountKey) {
+ counter.incrementAndGet();
+ return Optional.of(100L);
+ }
+ };
+ cache.getCachedRowCount(1, 1, 1);
+ for (int i = 0; i < 60; i++) {
+ cachedRowCount = cache.getCachedRowCount(1, 1, 1);
+ if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) {
+ Assertions.assertEquals(100, cachedRowCount);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ cachedRowCount = cache.getCachedRowCount(1, 1, 1);
+ Assertions.assertEquals(100, cachedRowCount);
+ Assertions.assertEquals(2, counter.get());
+
+ new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
+ @Mock
+ protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey
rowCountKey) {
+ counter.incrementAndGet();
+ try {
+ Thread.sleep(1000000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return Optional.of(100L);
+ }
+ };
+ cachedRowCount = cache.getCachedRowCount(2, 2, 2);
+ Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
+ Thread.sleep(1000);
+ cachedRowCount = cache.getCachedRowCount(2, 2, 2);
+ Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
+ for (int i = 0; i < 60; i++) {
+ if (counter.get() == 3) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertEquals(3, counter.get());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]