This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 580fd6afc86 Fix tag index memory accounting (#18029)
580fd6afc86 is described below
commit 580fd6afc86f89c6a98161f2b47182c2229f557d
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:21:09 2026 +0800
Fix tag index memory accounting (#18029)
* Fix tag index memory accounting
* Add concurrent tag index memory test
* Clarify tag index size accounting
* Clarify tag index overhead estimate
* Fix pipe source time range IT flakiness
---
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 32 ++-
.../schemaengine/schemaregion/tag/TagManager.java | 125 ++++++------
.../schemaregion/tag/TagManagerTest.java | 216 +++++++++++++++++++++
3 files changed, 309 insertions(+), 64 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 8f1004eefbc..fce4361163f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -55,6 +55,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import static org.junit.Assert.fail;
@@ -778,6 +779,11 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -814,10 +820,12 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- final Map<String, String> expectedCountResult = new HashMap<>();
- expectedCountResult.put("count(root.db.d1.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+ receiverEnv,
+ "select count(at1) from root.db.d1",
+ "count(root.db.d1.at1),",
+ Collections.singleton("3,"),
+ handleFailure);
// Insert realtime data that overlapped with time range
TestUtils.executeNonQueries(
@@ -828,9 +836,12 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
"flush"),
null);
- expectedCountResult.put("count(root.db.d3.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+ receiverEnv,
+ "select count(at1) from root.db.d1, root.db.d3",
+ "count(root.db.d1.at1),count(root.db.d3.at1),",
+ Collections.singleton("3,3,"),
+ handleFailure);
// Session Tablet can have unused timestamp slots when rowSize is
smaller than maxRowNumber.
// The pipe source time range filter should ignore the unused zero tail.
@@ -848,9 +859,12 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
session.insertTablet(tabletWithUnusedTail);
}
- expectedCountResult.put("count(root.db.d5.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+ receiverEnv,
+ "select count(at1) from root.db.d1, root.db.d3, root.db.d5",
+ "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
+ Collections.singleton("3,3,3,"),
+ handleFailure);
// Insert realtime data that does not overlap with time range
TestUtils.executeNonQueries(
@@ -865,7 +879,9 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
receiverEnv,
"select count(at1) from root.db.d1, root.db.d3, root.db.d5",
"count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
- Collections.singleton("3,3,3,"));
+ Collections.singleton("3,3,3,"),
+ 600,
+ handleFailure);
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show timeseries root.db.d2.**",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
index 6be09486b43..d8a92e15246 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
@@ -52,7 +52,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -71,6 +70,10 @@ public class TagManager {
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d,
contains key %b";
+ // The tag index memory model adds one int-sized estimated overhead for each
indexed key, value,
+ // and measurement reference. This is an accounting estimate rather than a
specific
+ // ConcurrentHashMap or Set field.
+ private static final long INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES =
Integer.BYTES;
private static final Logger logger =
LoggerFactory.getLogger(TagManager.class);
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
@@ -166,34 +169,31 @@ public class TagManager {
return;
}
- int tagIndexOldSize = tagIndex.size();
- Map<String, Set<IMeasurementMNode<?>>> tagValueMap =
- tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>());
- int tagIndexNewSize = tagIndex.size();
-
- int tagValueMapOldSize = tagValueMap.size();
- Set<IMeasurementMNode<?>> measurementsSet =
- tagValueMap.computeIfAbsent(tagValue, v ->
Collections.synchronizedSet(new HashSet<>()));
- int tagValueMapNewSize = tagValueMap.size();
+ tagIndex.compute(
+ tagKey,
+ (key, tagValueMap) -> {
+ long memorySize = 0;
+ if (tagValueMap == null) {
+ tagValueMap = new ConcurrentHashMap<>();
+ memorySize += RamUsageEstimator.sizeOf(tagKey) +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ }
- int measurementsSetOldSize = measurementsSet.size();
- measurementsSet.add(measurementMNode);
- int measurementsSetNewSize = measurementsSet.size();
+ Set<IMeasurementMNode<?>> measurementsSet =
tagValueMap.get(tagValue);
+ if (measurementsSet == null) {
+ measurementsSet = ConcurrentHashMap.newKeySet();
+ tagValueMap.put(tagValue, measurementsSet);
+ memorySize += RamUsageEstimator.sizeOf(tagValue) +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ }
- long memorySize = 0;
- if (tagIndexNewSize - tagIndexOldSize == 1) {
- // the last 4 is the memory occupied by the size of tagvaluemap
- memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
- }
- if (tagValueMapNewSize - tagValueMapOldSize == 1) {
- // the last 4 is the memory occupied by the size of measurementsSet
- memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
- }
- if (measurementsSetNewSize - measurementsSetOldSize == 1) {
- // 8 is the memory occupied by the length of the IMeasurementMNode
- memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
- }
- requestMemory(memorySize);
+ if (measurementsSet.add(measurementMNode)) {
+ memorySize +=
+ RamUsageEstimator.NUM_BYTES_OBJECT_REF +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ }
+ if (memorySize > 0) {
+ requestMemory(memorySize);
+ }
+ return tagValueMap;
+ });
}
public void addIndex(Map<String, String> tagsMap, IMeasurementMNode<?>
measurementMNode) {
@@ -208,32 +208,47 @@ public class TagManager {
if (tagKey == null || tagValue == null || measurementMNode == null) {
return;
}
- // init memory size
- long memorySize = 0;
- if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) {
- memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
- }
- if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
- if (tagIndex.get(tagKey).remove(tagValue) != null) {
- // the last 4 is the memory occupied by the size of
IMeasurementMNodeSet
- memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
- }
- }
- if (tagIndex.get(tagKey).isEmpty()) {
- if (tagIndex.remove(tagKey) != null) {
- // the last 4 is the memory occupied by the size of tagValueMap
- memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
- }
- }
- releaseMemory(memorySize);
+ tagIndex.computeIfPresent(
+ tagKey,
+ (key, tagValueMap) -> {
+ long memorySize = 0;
+ Set<IMeasurementMNode<?>> measurementsSet =
tagValueMap.get(tagValue);
+ if (measurementsSet == null) {
+ return tagValueMap;
+ }
+
+ if (measurementsSet.remove(measurementMNode)) {
+ memorySize +=
+ RamUsageEstimator.NUM_BYTES_OBJECT_REF +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ }
+ if (measurementsSet.isEmpty()) {
+ if (tagValueMap.remove(tagValue, measurementsSet)) {
+ memorySize +=
+ RamUsageEstimator.sizeOf(tagValue) +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ }
+ }
+ if (tagValueMap.isEmpty()) {
+ memorySize += RamUsageEstimator.sizeOf(tagKey) +
INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
+ if (memorySize > 0) {
+ releaseMemory(memorySize);
+ }
+ return null;
+ }
+ if (memorySize > 0) {
+ releaseMemory(memorySize);
+ }
+ return tagValueMap;
+ });
+ }
+
+ private boolean containsIndex(String tagKey, String tagValue) {
+ Map<String, Set<IMeasurementMNode<?>>> tagValueMap = tagIndex.get(tagKey);
+ return tagValueMap != null && tagValueMap.containsKey(tagValue);
}
private List<IMeasurementMNode<?>> getMatchedTimeseriesInIndex(TagFilter
tagFilter) {
- if (!tagIndex.containsKey(tagFilter.getKey())) {
- return Collections.emptyList();
- }
Map<String, Set<IMeasurementMNode<?>>> value2Node =
tagIndex.get(tagFilter.getKey());
- if (value2Node.isEmpty()) {
+ if (value2Node == null || value2Node.isEmpty()) {
return Collections.emptyList();
}
@@ -364,8 +379,7 @@ public class TagManager {
Map<String, String> tagMap = tagLogFile.readTag(node.getOffset());
if (tagMap != null) {
for (Map.Entry<String, String> entry : tagMap.entrySet()) {
- if (tagIndex.containsKey(entry.getKey())
- && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
+ if (containsIndex(entry.getKey(), entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
@@ -418,7 +432,7 @@ public class TagManager {
// we should remove before key-value from inverted index map
if (beforeValue != null && !beforeValue.equals(value)) {
- if (tagIndex.containsKey(key) &&
tagIndex.get(key).containsKey(beforeValue)) {
+ if (containsIndex(key, beforeValue)) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
@@ -551,8 +565,7 @@ public class TagManager {
if (!deleteTag.isEmpty()) {
for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
- if (tagIndex.containsKey((entry.getKey()))
- && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
+ if (containsIndex(entry.getKey(), entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
@@ -622,7 +635,7 @@ public class TagManager {
String beforeValue = entry.getValue();
String currentValue = newTagValue.get(key);
// change the tag inverted index map
- if (tagIndex.containsKey(key) &&
tagIndex.get(key).containsKey(beforeValue)) {
+ if (containsIndex(key, beforeValue)) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -680,7 +693,7 @@ public class TagManager {
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
// change the tag inverted index map
- if (tagIndex.containsKey(oldKey) &&
tagIndex.get(oldKey).containsKey(value)) {
+ if (containsIndex(oldKey, value)) {
if (logger.isDebugEnabled()) {
logger.debug(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java
new file mode 100644
index 00000000000..c96495c635c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.tag;
+
+import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
+import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.db.schemaengine.rescon.MemSchemaEngineStatistics;
+import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class TagManagerTest {
+
+ private File tempDir;
+ private MemSchemaRegionStatistics regionStatistics;
+ private TagManager tagManager;
+
+ @After
+ public void tearDown() throws Exception {
+ if (tagManager != null) {
+ tagManager.clear();
+ }
+ if (regionStatistics != null) {
+ regionStatistics.clear();
+ }
+ if (tempDir != null) {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void removeIndexIgnoresMissingEntriesAndReleasesOnlyExistingMemory()
throws Exception {
+ initTagManager();
+ final IMeasurementMNode<?> node = newMeasurementMNode("s0");
+
+ tagManager.removeIndex("missingKey", "missingValue", node);
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+
+ tagManager.addIndex("key", "value", node);
+ final long expectedMemory = indexMemory("key", "value", 1);
+ Assert.assertEquals(expectedMemory,
regionStatistics.getRegionMemoryUsage());
+
+ tagManager.removeIndex("key", "missingValue", node);
+ Assert.assertEquals(expectedMemory,
regionStatistics.getRegionMemoryUsage());
+
+ tagManager.removeIndex("key", "value", newMeasurementMNode("other"));
+ Assert.assertEquals(expectedMemory,
regionStatistics.getRegionMemoryUsage());
+
+ tagManager.removeIndex("key", "value", node);
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+
+ tagManager.removeIndex("key", "value", node);
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+ }
+
+ @Test
+ public void concurrentAddIndexRequestsMemoryForActualInsertionsOnly() throws
Exception {
+ initTagManager();
+ final String tagKey = "key";
+ final String tagValue = "value";
+ final int measurementCount = 128;
+ final List<IMeasurementMNode<?>> nodes = new ArrayList<>();
+ for (int i = 0; i < measurementCount; i++) {
+ nodes.add(newMeasurementMNode("s" + i));
+ }
+
+ final int workerCount = 16;
+ final ExecutorService executorService =
Executors.newFixedThreadPool(workerCount);
+ final CountDownLatch readyLatch = new CountDownLatch(workerCount);
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final List<Future<?>> futures = new ArrayList<>();
+ for (final IMeasurementMNode<?> node : nodes) {
+ futures.add(
+ executorService.submit(
+ () -> {
+ readyLatch.countDown();
+ startLatch.await();
+ tagManager.addIndex(tagKey, tagValue, node);
+ return null;
+ }));
+ }
+
+ try {
+ Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS));
+ startLatch.countDown();
+ for (final Future<?> future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
+
+ final long expectedMemory = indexMemory(tagKey, tagValue,
measurementCount);
+ Assert.assertEquals(expectedMemory,
regionStatistics.getRegionMemoryUsage());
+
+ for (final IMeasurementMNode<?> node : nodes) {
+ tagManager.addIndex(tagKey, tagValue, node);
+ }
+ Assert.assertEquals(expectedMemory,
regionStatistics.getRegionMemoryUsage());
+
+ for (final IMeasurementMNode<?> node : nodes) {
+ tagManager.removeIndex(tagKey, tagValue, node);
+ }
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+ }
+
+ @Test
+ public void concurrentAddAndRemoveIndexEventuallyReleasesAllMemory() throws
Exception {
+ initTagManager();
+ final String tagKey = "key";
+ final String tagValue = "value";
+ final IMeasurementMNode<?> node = newMeasurementMNode("s0");
+
+ final int workerCount = 16;
+ final int roundCount = 1000;
+ final ExecutorService executorService =
Executors.newFixedThreadPool(workerCount);
+ final CountDownLatch readyLatch = new CountDownLatch(workerCount);
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < workerCount; i++) {
+ futures.add(
+ executorService.submit(
+ () -> {
+ readyLatch.countDown();
+ startLatch.await();
+ for (int round = 0; round < roundCount; round++) {
+ tagManager.addIndex(tagKey, tagValue, node);
+ tagManager.removeIndex(tagKey, tagValue, node);
+ }
+ return null;
+ }));
+ }
+
+ try {
+ Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS));
+ startLatch.countDown();
+ for (final Future<?> future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+
+ tagManager.addIndex(tagKey, tagValue, node);
+ Assert.assertEquals(indexMemory(tagKey, tagValue, 1),
regionStatistics.getRegionMemoryUsage());
+
+ tagManager.removeIndex(tagKey, tagValue, node);
+ Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage());
+ }
+
+ private void initTagManager() throws Exception {
+ tempDir = Files.createTempDirectory("tag-manager").toFile();
+ regionStatistics = new MemSchemaRegionStatistics(0, new
MemSchemaEngineStatistics());
+ tagManager = new TagManager(tempDir.getAbsolutePath(), regionStatistics);
+ }
+
+ private static IMeasurementMNode<IMemMNode> newMeasurementMNode(final String
measurement) {
+ final IMNodeFactory<IMemMNode> nodeFactory =
+ MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();
+ return nodeFactory.createMeasurementMNode(
+ null,
+ measurement,
+ new MeasurementSchema(
+ measurement, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.SNAPPY),
+ null);
+ }
+
+ private static long indexMemory(
+ final String tagKey, final String tagValue, final int measurementCount) {
+ return RamUsageEstimator.sizeOf(tagKey)
+ + 4
+ + RamUsageEstimator.sizeOf(tagValue)
+ + 4
+ + (RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4) * measurementCount;
+ }
+}