This is an automated email from the ASF dual-hosted git repository.
Wei-hao-Li pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 87239453bb4 Fix stale field cache handling in table last-row
optimization (#17677)
87239453bb4 is described below
commit 87239453bb40e3505ca6902530285482ac06f396
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 22 09:51:50 2026 +0800
Fix stale field cache handling in table last-row optimization (#17677)
---
.../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 59 +++++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 2 +-
.../last/AbstractUpdateLastCacheOperator.java | 4 +-
.../relational/LastQueryAggTableScanOperator.java | 41 ++---
.../planner/DataNodeTableOperatorGenerator.java | 8 +-
.../plan/planner/OperatorTreeGenerator.java | 2 +-
.../fetcher/cache/TableDeviceLastCache.java | 178 ++++++++++++++-------
.../fetcher/cache/TableDeviceSchemaCache.java | 24 +--
.../cache/TreeDeviceSchemaCacheManager.java | 3 +-
.../fetcher/cache/TableDeviceLastCacheTest.java | 90 +++++++++++
.../fetcher/cache/TableDeviceSchemaCacheTest.java | 57 +++++--
11 files changed, 365 insertions(+), 103 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index 5e18a8468c7..289ae7789a4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.relational.it.db.it;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
@@ -32,14 +33,17 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
+import java.util.List;
import static
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL;
import static
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
import static
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.LEFT;
import static
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.RIGHT;
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqual;
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
import static org.junit.Assert.fail;
@@ -2466,6 +2470,8 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
@Test
public void lastCacheTest() {
+ prepareStaleLastRowCacheOnSingleDataNode();
+
expectedHeader =
new String[] {
"level", "attr1", "device", "attr2", "_col4", "_col5", "_col6",
"_col7", "_col8", "_col9",
@@ -2925,6 +2931,59 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
}
}
+ private static void prepareStaleLastRowCacheOnSingleDataNode() {
+ try {
+ final List<DataNodeWrapper> dataNodeWrappers =
EnvFactory.getEnv().getDataNodeWrapperList();
+ for (final DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
+ executeTableStatementOnSingleDataNode(dataNodeWrapper, "clear query
cache on local");
+ }
+
+ final DataNodeWrapper pollutedDataNode = dataNodeWrappers.get(0);
+
+ tableResultSetEqualOnSingleDataNode(
+ pollutedDataNode,
+ "select
last_by(num,time),last_by(bignum,time),last_by(floatnum,time) "
+ + "from table0 where device='d1' and level='l2' and
time<1971-04-26T17:46:40.000",
+ new String[] {"_col0", "_col1", "_col2"},
+ new String[] {"10,3147483648,231.55,"});
+ // This only refreshes the cached row time on one DataNode, keeping the
field caches stale.
+ tableResultSetEqualOnSingleDataNode(
+ pollutedDataNode,
+ "select last(time),last(device),last(level),last(attr1),last(attr2) "
+ + "from table0 where device='d1' and level='l2'",
+ new String[] {"_col0", "_col1", "_col2", "_col3", "_col4"},
+ new String[] {"1971-04-26T17:46:40.000Z,d1,l2,yy,zz,"});
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void executeTableStatementOnSingleDataNode(
+ final DataNodeWrapper dataNodeWrapper, final String sql) throws
Exception {
+ try (final Connection connection =
+ EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root",
"table");
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+
+ private static void tableResultSetEqualOnSingleDataNode(
+ final DataNodeWrapper dataNodeWrapper,
+ final String sql,
+ final String[] expectedHeader,
+ final String[] expectedRetArray)
+ throws Exception {
+ try (final Connection connection =
+ EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root",
"table");
+ final Statement statement = connection.createStatement()) {
+ statement.execute("use " + DATABASE_NAME);
+ try (final ResultSet resultSet = statement.executeQuery(sql)) {
+ tableResultSetEqual(resultSet, expectedHeader, expectedRetArray);
+ }
+ }
+ }
+
public static String[] buildHeaders(int length) {
String[] expectedHeader = new String[length];
for (int i = 0; i < length; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 35ac06036b2..25b7362d8aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1230,7 +1230,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
for (final Map.Entry<String, Pair<TSDataType, TimeValuePair>>
measurementLastEntry :
device2MeasurementLastEntry.getValue().entrySet()) {
final TimeValuePair tvPair =
measurementLastEntry.getValue().getRight();
- if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
+ if (tvPair != TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN) {
LastQueryUtil.appendLastValueRespectBlob(
builder,
tvPair.getTimestamp(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 77ab84b6a48..e1c3cb335b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -127,7 +127,7 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
new TimeValuePair[] {
Objects.nonNull(value)
? new TimeValuePair(time, value)
- : needUpdateNullEntry ?
TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null
+ : needUpdateNullEntry ?
TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null
},
fullPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {fullPath.getMeasurementSchema()});
@@ -139,7 +139,7 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
seriesScanInfo.right = new TimeValuePair(time, value);
} else {
seriesScanInfo.right =
- needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR
: null;
+ needUpdateNullEntry ?
TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
index e7d2292d515..7a2d2e3a806 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
@@ -54,8 +54,8 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Utils.serializeTimeValueWithNull;
import static
org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
-import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
-import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE;
/**
* This class is used to execute aggregation table scan when apply {@code
canUseLastCacheOptimize()}
@@ -280,7 +280,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
TsPrimitiveType tsPrimitiveType =
lastRowCacheResults.get(currentHitCacheIndex).getRight()[measurementIdx];
long lastByTime =
lastRowCacheResults.get(currentHitCacheIndex).getLeft().getAsLong();
- if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data for this time series
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
@@ -323,7 +323,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
TsPrimitiveType timeLastValue = currentHitResult[currentHitResult.length -
1].getValue();
// when there is no data, no need to append result if the query is GROUP
BY or output of
// aggregator is partial (final operator will produce NULL result)
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE
+ if (timeLastValue == PLACEHOLDER_NO_VALUE
&& (groupingKeySize != 0 ||
tableAggregators.get(0).getStep().isOutputPartial())) {
outputDeviceIndex++;
currentHitCacheIndex++;
@@ -347,7 +347,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
getNthIdColumnValue(
cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
+ if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
@@ -369,7 +369,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
long lastTime =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
+ if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) {
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
new Binary(
@@ -400,7 +400,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
cachedDeviceEntries.get(currentHitCacheIndex)
.getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
+ if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
@@ -421,7 +421,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
// last_by
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
+ if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) {
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
new Binary(
@@ -449,7 +449,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
case TIME:
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
// for last(time) aggregation
- if (timeLastValue == EMPTY_PRIMITIVE_TYPE) {
+ if (timeLastValue == PLACEHOLDER_NO_VALUE) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
@@ -472,7 +472,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
TsPrimitiveType tsPrimitiveType =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
- if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
@@ -510,7 +510,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
TsPrimitiveType tsPrimitiveType =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
- if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data for this time series
columnBuilder.appendNull();
} else {
@@ -560,7 +560,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
new
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
} else {
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
- updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
+ updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
} else {
LastByDescAccumulator lastByAccumulator =
@@ -577,8 +577,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
case FIELD:
LastByDescAccumulator lastByAccumulator =
(LastByDescAccumulator) tableAggregator.getAccumulator();
- updateMeasurementList.add(schema.getName());
- if (lastByAccumulator.hasInitResult() &&
!lastByAccumulator.isXNull()) {
+ if (lastByAccumulator.hasInitResult()) {
long lastByTime = lastByAccumulator.getLastTimeOfY();
if (!hasSetLastTime) {
@@ -588,11 +587,15 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
new TimeValuePair(lastByTime, new
TsPrimitiveType.TsLong(lastByTime)));
}
+ updateMeasurementList.add(schema.getName());
updateTimeValuePairList.add(
- new TimeValuePair(
- lastByTime,
cloneTsPrimitiveType(lastByAccumulator.getXResult())));
+ lastByAccumulator.isXNull()
+ ? new TimeValuePair(lastByTime, PLACEHOLDER_NO_VALUE)
+ : new TimeValuePair(
+ lastByTime,
cloneTsPrimitiveType(lastByAccumulator.getXResult())));
} else {
- updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
+ updateMeasurementList.add(schema.getName());
+ updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
break;
default:
@@ -629,7 +632,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
new
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
} else {
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
- updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
+ updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
}
break;
@@ -646,7 +649,7 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
lastAccumulator.getMaxTime(),
cloneTsPrimitiveType(lastAccumulator.getLastValue())));
} else {
- updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
+ updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
break;
default:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index 7a27d54b8f4..b5de9a5561e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -200,7 +200,8 @@ import static
org.apache.iotdb.db.queryengine.execution.operator.source.relation
import static
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.isFilterGtOrGe;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
-import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_STALE_VALUE;
import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP;
public class DataNodeTableOperatorGenerator
@@ -1626,6 +1627,9 @@ public class DataNodeTableOperatorGenerator
for (int j = 0; j < lastByResult.get().getRight().length; j++) {
TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
if (tsPrimitiveType == null
+ // Known-null at the aligned row time can still hit cache.
Only miss or stale target
+ // values need to fall back to scan for correctness.
+ || tsPrimitiveType == PLACEHOLDER_STALE_VALUE
|| (updateTimeFilter != null
&& !LastQueryUtil.satisfyFilter(
updateTimeFilter,
@@ -1725,7 +1729,7 @@ public class DataNodeTableOperatorGenerator
parameter.getSeriesScanOptions().getGlobalTimeFilter(),
timeValuePair)) {
if (isFilterGtOrGe(updateTimeFilter)) {
// it means there is no data meets Filter
- timeValuePair.setValue(EMPTY_PRIMITIVE_TYPE);
+ timeValuePair.setValue(PLACEHOLDER_NO_VALUE);
} else {
allHitCache = false;
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 3ea9bd43ff2..9f1aa7fc291 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2930,7 +2930,7 @@ public class OperatorTreeGenerator implements
PlanVisitor<Operator, LocalExecuti
if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
- } else if (timeValuePair.getValue() ==
TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
+ } else if (timeValuePair.getValue() ==
TableDeviceLastCache.PLACEHOLDER_NO_VALUE) {
// there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
// cached last value is not satisfied
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
index 5a2ac3d1e53..271051b72b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java
@@ -21,7 +21,6 @@ package
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
-import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -46,53 +45,36 @@ public class TableDeviceLastCache {
static final int INSTANCE_SIZE =
(int) RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class)
+ (int)
RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
+ private static final int LONG_INSTANCE_SIZE =
+ (int) RamUsageEstimator.shallowSizeOfInstance(Long.class);
- public static final TsPrimitiveType EMPTY_PRIMITIVE_TYPE =
- new TsPrimitiveType() {
- @Override
- public void setObject(Object o) {
- // Do nothing
- }
-
- @Override
- public void reset() {
- // Do nothing
- }
-
- @Override
- public int getSize() {
- return 0;
- }
-
- @Override
- public Object getValue() {
- return null;
- }
-
- @Override
- public String getStringValue() {
- return null;
- }
+ /**
+ * Cache hit and the measurement is known to be null at the aligned last-row
time. For stored
+ * entries, it is only used as the value part of the time column's cached
{@link TimeValuePair}.
+ */
+ public static final TsPrimitiveType PLACEHOLDER_NO_VALUE = new
TsPrimitiveType.TsInt();
- @Override
- public TSDataType getDataType() {
- return null;
- }
- };
+ /**
+ * Cache hit but the target measurement is stale under a newer aligned
last-row time. This
+ * sentinel is only returned by {@link #getLastRow(String, List)} and is
never stored in cache.
+ */
+ public static final TsPrimitiveType PLACEHOLDER_STALE_VALUE = new
TsPrimitiveType.TsInt();
private static final Optional<Pair<OptionalLong, TsPrimitiveType[]>>
HIT_AND_ALL_NULL =
Optional.of(new Pair<>(OptionalLong.empty(), null));
- /** This means that the tv pair has been put, and the value is null */
- public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
- new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
+ /** This means the measurement has been cached and is known to have no
values at all. */
+ public static final TimeValuePair PLACEHOLDER_EMPTY_COLUMN =
+ new TimeValuePair(Long.MIN_VALUE, PLACEHOLDER_NO_VALUE);
/** This means that the tv pair has been declared, and is ready for the next
put. */
- private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
- new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
+ private static final TimeValuePair PLACEHOLDER_NO_CACHE =
+ new TimeValuePair(Long.MIN_VALUE, PLACEHOLDER_NO_VALUE);
// Time is seen as "" as a measurement
private final Map<String, TimeValuePair> measurement2CachedLastMap = new
ConcurrentHashMap<>();
+ private final Map<String, Long> measurement2CachedLastKnownNullTimeMap =
+ new ConcurrentHashMap<>();
private final boolean isTableModel;
TableDeviceLastCache(final boolean isTableModel) {
@@ -117,7 +99,10 @@ public class TableDeviceLastCache {
if (Objects.isNull(finalMeasurement)) {
continue;
}
- final TimeValuePair newPair = isInvalidate ? null :
PLACEHOLDER_TIME_VALUE_PAIR;
+ if (isInvalidate &&
measurement2CachedLastKnownNullTimeMap.remove(finalMeasurement) != null) {
+ diff.addAndGet(-getKnownNullTimeEntrySize());
+ }
+ final TimeValuePair newPair = isInvalidate ? null : PLACEHOLDER_NO_CACHE;
measurement2CachedLastMap.compute(
finalMeasurement,
@@ -155,6 +140,7 @@ public class TableDeviceLastCache {
for (int i = 0; i < measurements.length; ++i) {
if (Objects.isNull(timeValuePairs[i])) {
if (invalidateNull) {
+ diff.addAndGet(removeKnownNullTime(measurements[i]));
diff.addAndGet(
-((int) RamUsageEstimator.sizeOf(measurements[i])
+
getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
@@ -162,6 +148,14 @@ public class TableDeviceLastCache {
continue;
}
+ if (isKnownNullAtAlignedTime(measurements[i], timeValuePairs[i])) {
+ if (lastTime < timeValuePairs[i].getTimestamp()) {
+ lastTime = timeValuePairs[i].getTimestamp();
+ }
+ diff.addAndGet(tryUpdateKnownNullTime(measurements[i],
timeValuePairs[i].getTimestamp()));
+ continue;
+ }
+
final int finalI = i;
if (lastTime < timeValuePairs[i].getTimestamp()) {
lastTime = timeValuePairs[i].getTimestamp();
@@ -170,7 +164,10 @@ public class TableDeviceLastCache {
measurements[i],
(measurement, tvPair) -> {
if (tvPair.getTimestamp() <=
timeValuePairs[finalI].getTimestamp()) {
- diff.addAndGet(getDiffSize(tvPair, timeValuePairs[finalI]));
+ diff.addAndGet(
+ getDiffSize(tvPair, timeValuePairs[finalI])
+ + clearKnownNullTimeIfCovered(
+ measurement, timeValuePairs[finalI].getTimestamp()));
return timeValuePairs[finalI];
}
return tvPair;
@@ -181,7 +178,7 @@ public class TableDeviceLastCache {
"",
(time, tvPair) ->
tvPair.getTimestamp() < finalLastTime
- ? new TimeValuePair(finalLastTime, EMPTY_PRIMITIVE_TYPE)
+ ? new TimeValuePair(finalLastTime, PLACEHOLDER_NO_VALUE)
: tvPair);
return diff.get();
}
@@ -190,6 +187,7 @@ public class TableDeviceLastCache {
int invalidate(final String measurement) {
final AtomicInteger diff = new AtomicInteger();
final AtomicLong time = new AtomicLong();
+ final AtomicLong knownNullTime = new AtomicLong(Long.MIN_VALUE);
measurement2CachedLastMap.computeIfPresent(
measurement,
(s, timeValuePair) -> {
@@ -199,13 +197,18 @@ public class TableDeviceLastCache {
time.set(timeValuePair.getTimestamp());
return null;
});
+ final Long removedKnownNullTime =
measurement2CachedLastKnownNullTimeMap.remove(measurement);
+ if (removedKnownNullTime != null) {
+ diff.addAndGet(-getKnownNullTimeEntrySize());
+ knownNullTime.set(removedKnownNullTime);
+ }
if (diff.get() == 0) {
return 0;
}
measurement2CachedLastMap.computeIfPresent(
"",
(s, timeValuePair) -> {
- if (timeValuePair.getTimestamp() <= time.get()) {
+ if (timeValuePair.getTimestamp() <= Math.max(time.get(),
knownNullTime.get())) {
diff.addAndGet((int) RamUsageEstimator.sizeOf(s) +
getTvPairEntrySize(timeValuePair));
return null;
}
@@ -225,25 +228,32 @@ public class TableDeviceLastCache {
private static boolean isEmptyTvPair(final TimeValuePair tvPair) {
return Objects.isNull(tvPair)
- || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
- || tvPair == EMPTY_TIME_VALUE_PAIR;
+ || tvPair == PLACEHOLDER_NO_CACHE
+ || tvPair == PLACEHOLDER_EMPTY_COLUMN;
+ }
+
+ private static boolean isKnownNullAtAlignedTime(
+ final @Nonnull String measurement, final @Nonnull TimeValuePair
timeValuePair) {
+ return !measurement.isEmpty()
+ && timeValuePair != PLACEHOLDER_EMPTY_COLUMN
+ && timeValuePair.getValue() == PLACEHOLDER_NO_VALUE;
}
@Nullable
TimeValuePair getTimeValuePair(final @Nonnull String measurement) {
final TimeValuePair result = measurement2CachedLastMap.get(measurement);
- return result != PLACEHOLDER_TIME_VALUE_PAIR ? result : null;
+ return result != PLACEHOLDER_NO_CACHE ? result : null;
}
// Shall pass in "" if last by time
Optional<Pair<OptionalLong, TsPrimitiveType[]>> getLastRow(
final @Nonnull String sourceMeasurement, final List<String>
targetMeasurements) {
final TimeValuePair pair =
measurement2CachedLastMap.get(sourceMeasurement);
- if (Objects.isNull(pair) || pair == PLACEHOLDER_TIME_VALUE_PAIR) {
+ if (Objects.isNull(pair) || pair == PLACEHOLDER_NO_CACHE) {
return Optional.empty();
}
- if (pair == EMPTY_TIME_VALUE_PAIR) {
+ if (pair == PLACEHOLDER_EMPTY_COLUMN) {
return HIT_AND_ALL_NULL;
}
final long alignTime = pair.getTimestamp();
@@ -255,14 +265,10 @@ public class TableDeviceLastCache {
.map(
targetMeasurement -> {
if (!targetMeasurement.isEmpty()) {
- final TimeValuePair tvPair =
- measurement2CachedLastMap.get(targetMeasurement);
- if (Objects.isNull(tvPair)) {
- return null;
- }
- return tvPair.getTimestamp() == alignTime
- ? tvPair.getValue()
- : EMPTY_PRIMITIVE_TYPE;
+ return getLastRowTargetValue(
+ alignTime,
+ measurement2CachedLastMap.get(targetMeasurement),
+
measurement2CachedLastKnownNullTimeMap.get(targetMeasurement));
} else {
return new TsPrimitiveType.TsLong(alignTime);
}
@@ -270,15 +276,77 @@ public class TableDeviceLastCache {
.toArray(TsPrimitiveType[]::new)));
}
+ @Nullable
+ private static TsPrimitiveType getLastRowTargetValue(
+ final long alignTime,
+ final @Nullable TimeValuePair tvPair,
+ final @Nullable Long knownNullTime) {
+ if (knownNullTime != null && knownNullTime == alignTime) {
+ return PLACEHOLDER_NO_VALUE;
+ }
+ if (Objects.isNull(tvPair) || tvPair == PLACEHOLDER_NO_CACHE) {
+ return null;
+ }
+ if (tvPair == PLACEHOLDER_EMPTY_COLUMN) {
+ return PLACEHOLDER_NO_VALUE;
+ }
+ return tvPair.getTimestamp() == alignTime ? tvPair.getValue() :
PLACEHOLDER_STALE_VALUE;
+ }
+
int estimateSize() {
return INSTANCE_SIZE
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY *
measurement2CachedLastMap.size()
+ + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ * measurement2CachedLastKnownNullTimeMap.size()
+ measurement2CachedLastMap.entrySet().stream()
.mapToInt(
entry ->
(isTableModel ? 0 : (int)
RamUsageEstimator.sizeOf(entry.getKey()))
+ TableDeviceLastCache.getTvPairSize(entry.getValue()))
- .reduce(0, Integer::sum);
+ .reduce(0, Integer::sum)
+ + measurement2CachedLastKnownNullTimeMap.size() * LONG_INSTANCE_SIZE;
+ }
+
+ private int tryUpdateKnownNullTime(final @Nonnull String measurement, final
long knownNullTime) {
+ final AtomicInteger diff = new AtomicInteger(0);
+ measurement2CachedLastMap.computeIfPresent(
+ measurement,
+ (measurementName, tvPair) -> {
+ measurement2CachedLastKnownNullTimeMap.compute(
+ measurementName,
+ (ignored, oldTime) -> {
+ if (oldTime == null) {
+ diff.addAndGet(getKnownNullTimeEntrySize());
+ return knownNullTime;
+ }
+ return oldTime < knownNullTime ? knownNullTime : oldTime;
+ });
+ return tvPair;
+ });
+ return diff.get();
+ }
+
+ private int clearKnownNullTimeIfCovered(
+ final @Nonnull String measurement, final long coveredTime) {
+ if (measurement.isEmpty()) {
+ return 0;
+ }
+ final Long knownNullTime =
measurement2CachedLastKnownNullTimeMap.get(measurement);
+ if (knownNullTime != null && knownNullTime <= coveredTime) {
+ measurement2CachedLastKnownNullTimeMap.remove(measurement);
+ return -getKnownNullTimeEntrySize();
+ }
+ return 0;
+ }
+
+ private int removeKnownNullTime(final @Nonnull String measurement) {
+ return measurement2CachedLastKnownNullTimeMap.remove(measurement) == null
+ ? 0
+ : -getKnownNullTimeEntrySize();
+ }
+
+ private static int getKnownNullTimeEntrySize() {
+ return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY +
LONG_INSTANCE_SIZE;
}
private static int getDiffSize(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index eed26dadc29..f49d6707377 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -211,10 +211,12 @@ public class TableDeviceSchemaCache {
*
* <p>- Second time put the calculated {@link TimeValuePair}s, and use {@link
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[])}.
The input {@link
- * TimeValuePair}s shall never be or contain {@code null}, if a measurement
is with all {@code
- * null}s, its {@link TimeValuePair} shall be {@link
TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}.
- * For time column, the input measurement shall be "", and the value shall
be {@link
- * TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE}. If the time column is not
explicitly specified, the
+ * TimeValuePair}s shall never be or contain {@code null}. If a measurement
is with all {@code
+ * null}s, its {@link TimeValuePair} shall be {@link
+ * TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN}; if it is known to be
{@code null} at a concrete
+ * last-row time, preserve that time and use {@link
TableDeviceLastCache#PLACEHOLDER_NO_VALUE} as
+ * the value. For time column, the input measurement shall be "", and the
value shall be {@link
+ * TableDeviceLastCache#PLACEHOLDER_NO_VALUE}. If the time column is not
explicitly specified, the
* device's last time won't be updated because we cannot guarantee the
completeness of the
* existing measurements in cache.
*
@@ -304,8 +306,8 @@ public class TableDeviceSchemaCache {
* @param database the device's database, without "root", {@code null} for
tree model
* @param deviceId {@link IDeviceID}
* @param measurement the measurement to get
- * @return {@code null} iff cache miss, {@link
TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR} iff
- * cache hit but result is {@code null}, and the result value otherwise.
+ * @return {@code null} iff cache miss, {@link
TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN} iff
+ * cache hit but the measurement has no values at all, and the result
value otherwise.
*/
public TimeValuePair getLastEntry(
final @Nullable String database, final IDeviceID deviceId, final String
measurement) {
@@ -321,8 +323,8 @@ public class TableDeviceSchemaCache {
* @param database the device's database, without "root", {@code null} for
tree model
* @param deviceId {@link IDeviceID}
* @param measurements the measurements to get
- * @return {@code null} iff cache miss, {@link
TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR} iff
- * cache hit but result is {@code null}, and the result value otherwise.
+ * @return {@code null} iff cache miss, {@link
TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN} iff
+ * cache hit but the measurement has no values at all, and the result
value otherwise.
*/
public TimeValuePair[] getLastEntries(
final @Nullable String database, final IDeviceID deviceId, final
String[] measurements) {
@@ -345,8 +347,10 @@ public class TableDeviceSchemaCache {
* the {@link Pair#left} will be the source measurement's last time,
(OptionalLong.empty() iff
* the source measurement is all {@code null}); {@link Pair#right} will
be an {@link
* TsPrimitiveType} array, whose element will be {@code null} if cache
miss, {@link
- * TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE} iff cache hit and the
measurement is without any
- * values when last by the source measurement's time, and the result
value otherwise.
+ * TableDeviceLastCache#PLACEHOLDER_NO_VALUE} iff cache hit and the
measurement is known to be
+ * {@code null} when last by the source measurement's time, {@link
+ * TableDeviceLastCache#PLACEHOLDER_STALE_VALUE} iff cache hit but the
target measurement is
+ * stale under a newer source time, and the result value otherwise.
*/
public Optional<Pair<OptionalLong, TsPrimitiveType[]>> getLastRow(
final String database,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
index cdb66fdc047..93d75aeff2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java
@@ -355,7 +355,8 @@ public class TreeDeviceSchemaCacheManager {
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[],
boolean,
* IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or
contain {@code null},
* if the measurement is with all {@code null}s, its {@link TimeValuePair}
shall be {@link
- * TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed
to update time column.
+ * TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN}. This method is not
supposed to update time
+ * column.
*
* @param database the device's database, WITH "root"
* @param measurementPath the fetched {@link MeasurementPath}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java
new file mode 100644
index 00000000000..eedbbfbb4ec
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
+
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+public class TableDeviceLastCacheTest {
+
+ @Test
+ public void testKnownNullTimePreservesHistoricalValueAndClearsOnNewerValue()
{
+ final TableDeviceLastCache cache = new TableDeviceLastCache(false);
+
+ cache.initOrInvalidate(null, null, new String[] {"", "s1"}, false);
+
+ final TimeValuePair historicalValue = new TimeValuePair(1L, new
TsPrimitiveType.TsInt(1));
+ cache.tryUpdate(
+ new String[] {"", "s1"},
+ new TimeValuePair[] {
+ new TimeValuePair(1L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE),
historicalValue
+ });
+ cache.tryUpdate(
+ new String[] {"s1"},
+ new TimeValuePair[] {new TimeValuePair(2L,
TableDeviceLastCache.PLACEHOLDER_NO_VALUE)});
+
+ Assert.assertEquals(historicalValue, cache.getTimeValuePair("s1"));
+ Optional<Pair<OptionalLong, TsPrimitiveType[]>> result =
+ cache.getLastRow("", Collections.singletonList("s1"));
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft());
+ Assert.assertArrayEquals(
+ new TsPrimitiveType[] {TableDeviceLastCache.PLACEHOLDER_NO_VALUE},
result.get().getRight());
+
+ final TimeValuePair newerValue = new TimeValuePair(3L, new
TsPrimitiveType.TsInt(3));
+ cache.tryUpdate(new String[] {"s1"}, new TimeValuePair[] {newerValue});
+
+ Assert.assertEquals(newerValue, cache.getTimeValuePair("s1"));
+ result = cache.getLastRow("", Collections.singletonList("s1"));
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(OptionalLong.of(3L), result.get().getLeft());
+ Assert.assertArrayEquals(
+ new TsPrimitiveType[] {new TsPrimitiveType.TsInt(3)},
result.get().getRight());
+ }
+
+ @Test
+ public void testInvalidateMeasurementClearsKnownNullTimeAndAlignedTime() {
+ final TableDeviceLastCache cache = new TableDeviceLastCache(false);
+
+ cache.initOrInvalidate(null, null, new String[] {"", "s1"}, false);
+ cache.tryUpdate(
+ new String[] {"", "s1"},
+ new TimeValuePair[] {
+ new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE),
+ new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE)
+ });
+
+ Assert.assertTrue(cache.getLastRow("",
Collections.singletonList("s1")).isPresent());
+ Assert.assertNotNull(cache.getTimeValuePair(""));
+
+ cache.invalidate("s1");
+
+ Assert.assertFalse(cache.getLastRow("",
Collections.singletonList("s1")).isPresent());
+ Assert.assertNull(cache.getTimeValuePair(""));
+ Assert.assertNull(cache.getTimeValuePair("s1"));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
index cde0ffbb13f..e808652a42a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java
@@ -334,10 +334,10 @@ public class TableDeviceSchemaCacheTest {
database1,
convertTagValuesToDeviceID(table1, device0),
new String[] {"s4"},
- new TimeValuePair[] {TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR});
+ new TimeValuePair[] {TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN});
Assert.assertSame(
- TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR,
+ TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN,
cache.getLastEntry(database1, convertTagValuesToDeviceID(table1,
device0), "s4"));
// Test null miss measurements
@@ -358,19 +358,52 @@ public class TableDeviceSchemaCacheTest {
database1,
convertTagValuesToDeviceID(table1, device0),
new String[] {""},
- new TimeValuePair[] {new TimeValuePair(2L,
TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE)});
+ new TimeValuePair[] {new TimeValuePair(2L,
TableDeviceLastCache.PLACEHOLDER_NO_VALUE)});
+
+ updateLastCache4Query(
+ cache,
+ database1,
+ convertTagValuesToDeviceID(table1, device0),
+ new String[] {"s1"},
+ new TimeValuePair[] {
+ new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE),
+ });
+
+ Assert.assertEquals(
+ tv3, cache.getLastEntry(database1, convertTagValuesToDeviceID(table1,
device0), "s1"));
+
+ result =
+ cache.getLastRow(
+ database1, convertTagValuesToDeviceID(table1, device0), "",
Arrays.asList("s2", "s1"));
+ Assert.assertTrue(result.isPresent());
+ Assert.assertTrue(result.get().getLeft().isPresent());
+ Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft());
+ Assert.assertArrayEquals(
+ new TsPrimitiveType[] {
+ new TsPrimitiveType.TsInt(2),
TableDeviceLastCache.PLACEHOLDER_NO_VALUE,
+ },
+ result.get().getRight());
+
+ cache.initOrInvalidateLastCache(
+ database1, convertTagValuesToDeviceID(table1, device0), new String[]
{"s5"}, false);
result =
cache.getLastRow(
database1,
convertTagValuesToDeviceID(table1, device0),
- "",
- Collections.singletonList("s2"));
+ "s2",
+ Arrays.asList("s0", "s1", "s4", "s5"));
Assert.assertTrue(result.isPresent());
Assert.assertTrue(result.get().getLeft().isPresent());
Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft());
Assert.assertArrayEquals(
- new TsPrimitiveType[] {new TsPrimitiveType.TsInt(2)},
result.get().getRight());
+ new TsPrimitiveType[] {
+ TableDeviceLastCache.PLACEHOLDER_STALE_VALUE,
+ TableDeviceLastCache.PLACEHOLDER_NO_VALUE,
+ TableDeviceLastCache.PLACEHOLDER_NO_VALUE,
+ null
+ },
+ result.get().getRight());
result =
cache.getLastRow(
@@ -386,7 +419,7 @@ public class TableDeviceSchemaCacheTest {
new TsPrimitiveType.TsInt(3),
new TsPrimitiveType.TsLong(1),
new TsPrimitiveType.TsInt(3),
- TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE,
+ TableDeviceLastCache.PLACEHOLDER_NO_VALUE,
null
},
result.get().getRight());
@@ -460,8 +493,8 @@ public class TableDeviceSchemaCacheTest {
convertTagValuesToDeviceID(table2, device0),
new String[] {"", "s2"},
new TimeValuePair[] {
- new TimeValuePair(Long.MIN_VALUE,
TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE),
- TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR
+ new TimeValuePair(Long.MIN_VALUE,
TableDeviceLastCache.PLACEHOLDER_NO_VALUE),
+ TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN
});
result =
@@ -471,7 +504,7 @@ public class TableDeviceSchemaCacheTest {
Assert.assertTrue(result.get().getLeft().isPresent());
Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE),
result.get().getLeft());
Assert.assertArrayEquals(
- new TsPrimitiveType[] {TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE,
null},
+ new TsPrimitiveType[] {TableDeviceLastCache.PLACEHOLDER_NO_VALUE,
null},
result.get().getRight());
updateLastCache4Query(
@@ -492,7 +525,7 @@ public class TableDeviceSchemaCacheTest {
Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE),
result.get().getLeft());
Assert.assertArrayEquals(
new TsPrimitiveType[] {
- TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new
TsPrimitiveType.TsInt(3),
+ TableDeviceLastCache.PLACEHOLDER_NO_VALUE, new
TsPrimitiveType.TsInt(3),
},
result.get().getRight());
@@ -504,7 +537,7 @@ public class TableDeviceSchemaCacheTest {
Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE),
result.get().getLeft());
Assert.assertArrayEquals(
new TsPrimitiveType[] {
- TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new
TsPrimitiveType.TsInt(3),
+ TableDeviceLastCache.PLACEHOLDER_NO_VALUE, new
TsPrimitiveType.TsInt(3),
},
result.get().getRight());