This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 205e373a013 Refactored the schema cache (#16446)
205e373a013 is described below
commit 205e373a013f77192b858f4f5ef311c13e8d5178
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 22 10:03:40 2025 +0800
Refactored the schema cache (#16446)
* partial
* Update pom.xml
---
.../common/schematree/DeviceSchemaInfo.java | 12 +-
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 2 +
.../plan/analyze/schema/ClusterSchemaFetcher.java | 4 +-
.../plan/planner/OperatorTreeGenerator.java | 10 +-
.../fetcher/cache/TableDeviceLastCache.java | 4 +
.../cache/TreeDeviceSchemaCacheManager.java | 155 ++++++++++++---------
.../cache/TreeDeviceSchemaCacheManagerTest.java | 19 ++-
7 files changed, 112 insertions(+), 94 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
index 2c52b6b6770..ead892f3e62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java
@@ -30,16 +30,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
-
public class DeviceSchemaInfo {
- private PartialPath devicePath;
- private boolean isAligned;
- private List<IMeasurementSchemaInfo> measurementSchemaInfoList;
- private int templateId = NON_TEMPLATE;
-
- private DeviceSchemaInfo() {}
+ private final PartialPath devicePath;
+ private final boolean isAligned;
+ private final List<IMeasurementSchemaInfo> measurementSchemaInfoList;
+ private final int templateId;
public DeviceSchemaInfo(
PartialPath devicePath,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index 841444f4279..5e614fa5fd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -224,6 +224,8 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK,
V>>
final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(belongedGroup.getFirstKey());
if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
+ // The removal is non-atomic, but it's ok because it's just a cache and
does not affect the
+ // consistency if you evicts some entries being added
if (Objects.nonNull(firstKeyMap.remove(belongedGroup.getFirstKey()))) {
memory +=
sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index 5dca594eaaf..49e2995b3a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -125,7 +125,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
Set<String> storageGroupSet = new HashSet<>();
if (!explicitDevicePatternList.isEmpty()) {
for (PartialPath explicitDevicePattern : explicitDevicePatternList) {
- cachedSchema =
schemaCache.getMatchedSchemaWithTemplate(explicitDevicePattern);
+ cachedSchema =
schemaCache.getMatchedTemplateSchema(explicitDevicePattern);
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
@@ -144,7 +144,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
continue;
}
cachedSchema =
- schemaCache.getMatchedSchemaWithoutTemplate(new
MeasurementPath(fullPath.getNodes()));
+ schemaCache.getMatchedNormalSchema(new
MeasurementPath(fullPath.getNodes()));
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 2bc2c419412..0c36104055f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2800,12 +2800,11 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
final boolean isNeedUpdateLastCache = context.isNeedUpdateLastCache();
if (isNeedUpdateLastCache) {
TreeDeviceSchemaCacheManager.getInstance()
- .updateLastCache(
+ .declareLastCache(
((DataDriverContext) operatorContext.getDriverContext())
.getDataRegion()
.getDatabaseName(),
- fullPath,
- false);
+ fullPath);
}
return Objects.isNull(node.getOutputViewPath())
@@ -2857,13 +2856,12 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
for (int i = 0; i < size; ++i) {
TreeDeviceSchemaCacheManager.getInstance()
- .updateLastCache(
+ .declareLastCache(
databaseName,
new MeasurementPath(
devicePath.concatNode(unCachedPath.getMeasurementList().get(i)),
unCachedPath.getSchemaList().get(i),
- true),
- false);
+ true));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
index 16f5e208f9d..62dbfdccdeb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
@@ -82,8 +82,12 @@ public class TableDeviceLastCache {
private static final Optional<Pair<OptionalLong, TsPrimitiveType[]>>
HIT_AND_ALL_NULL =
Optional.of(new Pair<>(OptionalLong.empty(), null));
+
+ /** This means that the tv pair has been put, and the value is null */
public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
+
+ /** This means that the tv pair has been declared, and is ready for the next
put. */
private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
index f38fbefa378..0a38501c0f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
@@ -128,14 +128,14 @@ public class TreeDeviceSchemaCacheManager {
* @param devicePath full path of the device
* @return empty if cache miss or the device path is not a template
activated path
*/
- public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath
devicePath) {
+ public ClusterSchemaTree getMatchedTemplateSchema(final PartialPath
devicePath) {
final ClusterSchemaTree tree = new ClusterSchemaTree();
final IDeviceSchema schema =
tableDeviceSchemaCache.getDeviceSchema(devicePath.getNodes());
if (!(schema instanceof TreeDeviceTemplateSchema)) {
return tree;
}
final TreeDeviceTemplateSchema treeSchema = (TreeDeviceTemplateSchema)
schema;
- Template template =
templateManager.getTemplate(treeSchema.getTemplateId());
+ final Template template =
templateManager.getTemplate(treeSchema.getTemplateId());
tree.appendTemplateDevice(devicePath, template.isDirectAligned(),
template.getId(), template);
tree.setDatabases(Collections.singleton(treeSchema.getDatabase()));
return tree;
@@ -147,7 +147,7 @@ public class TreeDeviceSchemaCacheManager {
* @param fullPath full path
* @return empty if cache miss
*/
- public ClusterSchemaTree getMatchedSchemaWithoutTemplate(final PartialPath
fullPath) {
+ public ClusterSchemaTree getMatchedNormalSchema(final PartialPath fullPath) {
final ClusterSchemaTree tree = new ClusterSchemaTree();
final IDeviceSchema schema =
tableDeviceSchemaCache.getDeviceSchema(
@@ -283,60 +283,7 @@ public class TreeDeviceSchemaCacheManager {
continue;
}
final IMeasurementSchema schema = templateSchema.get(measurements[i]);
- computation.computeMeasurement(
- i,
- new IMeasurementSchemaInfo() {
- @Override
- public String getName() {
- return schema.getMeasurementName();
- }
-
- @Override
- public IMeasurementSchema getSchema() {
- if (isLogicalView()) {
- return new LogicalViewSchema(
- schema.getMeasurementName(), ((LogicalViewSchema)
schema).getExpression());
- } else {
- return this.getSchemaAsMeasurementSchema();
- }
- }
-
- @Override
- public MeasurementSchema getSchemaAsMeasurementSchema() {
- return new MeasurementSchema(
- schema.getMeasurementName(),
- schema.getType(),
- schema.getEncodingType(),
- schema.getCompressor());
- }
-
- @Override
- public LogicalViewSchema getSchemaAsLogicalViewSchema() {
- throw new RuntimeException(
- new UnsupportedOperationException(
- "Function getSchemaAsLogicalViewSchema is not supported
in DeviceUsingTemplateSchemaCache."));
- }
-
- @Override
- public Map<String, String> getTagMap() {
- return null;
- }
-
- @Override
- public Map<String, String> getAttributeMap() {
- return null;
- }
-
- @Override
- public String getAlias() {
- return null;
- }
-
- @Override
- public boolean isLogicalView() {
- return schema.isLogicalView();
- }
- });
+ computation.computeMeasurement(i, new WrappedSchemaInfo(schema));
}
return indexOfMissingMeasurements;
}
@@ -399,10 +346,10 @@ public class TreeDeviceSchemaCacheManager {
*
* <p>Note: The query shall put the {@link TableDeviceLastCache} twice:
*
- * <p>- First time set the "isCommit" to {@code false} before the query
accesses data. It is just
- * to allow the writing to update the cache, then avoid that the query put a
stale value to cache
- * and break the consistency. WARNING: The writing may temporarily put a
stale value in cache if a
- * stale value is written, but it won't affect the eventual consistency.
+ * <p>- First time call this before the query accesses data. It is just to
allow the writing to
+ * update the cache, then avoid that the query put a stale value to cache
and break the
+ * consistency. WARNING: The writing may temporarily put a stale value in
cache if a stale value
+ * is written, but it won't affect the eventual consistency.
*
* <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[],
boolean,
@@ -410,22 +357,35 @@ public class TreeDeviceSchemaCacheManager {
* if the measurement is with all {@code null}s, its {@link TimeValuePair}
shall be {@link
* TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed
to update time column.
*
+ * @param database the device's database, WITH "root"
+ * @param measurementPath the fetched {@link MeasurementPath}
+ */
+ public void declareLastCache(final String database, final MeasurementPath
measurementPath) {
+ tableDeviceSchemaCache.updateLastCache(
+ database,
+ measurementPath.getIDeviceID(),
+ new String[] {measurementPath.getMeasurement()},
+ null,
+ measurementPath.isUnderAlignedEntity(),
+ new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
+ true);
+ }
+
+ /**
+ * Update the {@link TableDeviceLastCache} on query in tree model.
+ *
* <p>If the query has ended abnormally, it shall call this to invalidate
the entry it has pushed
- * in the first time, to avoid the stale writing damaging the eventual
consistency. In this case
- * and the "isInvalidate" shall be {@code true}.
+ * in the first time, to avoid the stale writing damaging the eventual
consistency.
*
* @param database the device's database, WITH "root"
* @param measurementPath the fetched {@link MeasurementPath}
- * @param isInvalidate {@code true} if invalidate the first pushed cache, or
{@code null} for the
- * first fetch.
*/
- public void updateLastCache(
- final String database, final MeasurementPath measurementPath, final
boolean isInvalidate) {
+ public void invalidateLastCache(final String database, final MeasurementPath
measurementPath) {
tableDeviceSchemaCache.updateLastCache(
database,
measurementPath.getIDeviceID(),
new String[] {measurementPath.getMeasurement()},
- isInvalidate ? new TimeValuePair[] {null} : null,
+ new TimeValuePair[] {null},
measurementPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
true);
@@ -446,4 +406,63 @@ public class TreeDeviceSchemaCacheManager {
public void cleanUp() {
tableDeviceSchemaCache.invalidateAll();
}
+
+ private static class WrappedSchemaInfo implements IMeasurementSchemaInfo {
+ private final IMeasurementSchema schema;
+
+ public WrappedSchemaInfo(final IMeasurementSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public String getName() {
+ return schema.getMeasurementName();
+ }
+
+ @Override
+ public IMeasurementSchema getSchema() {
+ if (isLogicalView()) {
+ return new LogicalViewSchema(
+ schema.getMeasurementName(), ((LogicalViewSchema)
schema).getExpression());
+ } else {
+ return this.getSchemaAsMeasurementSchema();
+ }
+ }
+
+ @Override
+ public MeasurementSchema getSchemaAsMeasurementSchema() {
+ return new MeasurementSchema(
+ schema.getMeasurementName(),
+ schema.getType(),
+ schema.getEncodingType(),
+ schema.getCompressor());
+ }
+
+ @Override
+ public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+ throw new RuntimeException(
+ new UnsupportedOperationException(
+ "Function getSchemaAsLogicalViewSchema is not supported in
DeviceUsingTemplateSchemaCache."));
+ }
+
+ @Override
+ public Map<String, String> getTagMap() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getAttributeMap() {
+ return null;
+ }
+
+ @Override
+ public String getAlias() {
+ return null;
+ }
+
+ @Override
+ public boolean isLogicalView() {
+ return schema.isLogicalView();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
index 446d0cd6587..b166e3d3772 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
@@ -197,10 +197,10 @@ public class TreeDeviceSchemaCacheManagerTest {
final TimeValuePair tv1 = new TimeValuePair(1, new
TsPrimitiveType.TsInt(1));
- treeDeviceSchemaCacheManager.updateLastCache(
- database, new MeasurementPath(device.concatNode("s1"), s1), false);
- treeDeviceSchemaCacheManager.updateLastCache(
- database, new MeasurementPath(device.concatNode("s3"), s3), false);
+ treeDeviceSchemaCacheManager.declareLastCache(
+ database, new MeasurementPath(device.concatNode("s1"), s1));
+ treeDeviceSchemaCacheManager.declareLastCache(
+ database, new MeasurementPath(device.concatNode("s3"), s3));
// Simulate "s1" revert when the query has failed in calculation
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -213,8 +213,8 @@ public class TreeDeviceSchemaCacheManagerTest {
},
false,
new MeasurementSchema[] {s1});
- treeDeviceSchemaCacheManager.updateLastCache(
- database, new MeasurementPath(device.concatNode("s1"), s1), true);
+ treeDeviceSchemaCacheManager.invalidateLastCache(
+ database, new MeasurementPath(device.concatNode("s1"), s1));
// "s2" shall be null since the "null" timeValuePair has not been put
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -294,12 +294,11 @@ public class TreeDeviceSchemaCacheManagerTest {
new MeasurementPath("root.sg1.d3.s1", TSDataType.FLOAT));
treeDeviceSchemaCacheManager.put(clusterSchemaTree);
final ClusterSchemaTree d1Tree =
- treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new
PartialPath("root.sg1.d1"));
+ treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new
PartialPath("root.sg1.d1"));
final ClusterSchemaTree d2Tree =
- treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new
PartialPath("root.sg1.d2"));
+ treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new
PartialPath("root.sg1.d2"));
final ClusterSchemaTree d3Tree =
- treeDeviceSchemaCacheManager.getMatchedSchemaWithoutTemplate(
- new MeasurementPath("root.sg1.d3.s1"));
+ treeDeviceSchemaCacheManager.getMatchedNormalSchema(new
MeasurementPath("root.sg1.d3.s1"));
List<MeasurementPath> measurementPaths =
d1Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left;
Assert.assertEquals(2, measurementPaths.size());
for (final MeasurementPath measurementPath : measurementPaths) {