This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 89730b14397 Optimized the overall performance of IoTDB & Fixed the NPE
in LimitOperatorTest (#17664)
89730b14397 is described below
commit 89730b14397d675f2255dc2a8e8069168d18c7c3
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 14:27:06 2026 +0800
Optimized the overall performance of IoTDB & Fixed the NPE in
LimitOperatorTest (#17664)
* Opt
* Update UnclosedFileScanHandleImpl.java
* Update StorageEngine.java
* Update ClosedFileScanHandleImpl.java
* column index
* spt
* Address performance review comments
* fix
---
.../client-cpp/src/main/SessionDataSet.cpp | 25 +--
.../org/apache/iotdb/isession/SessionDataSet.java | 27 ++-
.../org/apache/iotdb/rpc/IoTDBJDBCDataSet.java | 47 ++++--
.../iotdb/rpc/stmt/PreparedParameterSerde.java | 13 +-
.../java/org/apache/iotdb/session/Session.java | 186 +++++++++++----------
.../fetcher/cache/TableDeviceSchemaCache.java | 6 +-
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 9 +-
.../iotdb/db/storageengine/StorageEngine.java | 16 +-
.../filescan/impl/ClosedFileScanHandleImpl.java | 35 ++--
.../filescan/impl/UnclosedFileScanHandleImpl.java | 83 ++++++---
.../dataregion/tsfile/TsFileManager.java | 21 +--
.../tsfile/timeindex/ArrayDeviceTimeIndex.java | 2 +-
.../apache/iotdb/db/utils/ModificationUtils.java | 57 ++++---
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 171 ++++---------------
.../cache/TreeDeviceSchemaCacheManagerTest.java | 49 ++++++
.../execution/operator/LimitOperatorTest.java | 3 +
.../org/apache/iotdb/commons/path/PartialPath.java | 28 +++-
17 files changed, 422 insertions(+), 356 deletions(-)
diff --git a/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
b/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
index 11c2e8f528b..87488c2f7e7 100644
--- a/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
+++ b/iotdb-client/client-cpp/src/main/SessionDataSet.cpp
@@ -246,38 +246,39 @@ const std::vector<std::string>&
SessionDataSet::DataIterator::getColumnTypeList(
shared_ptr<RowRecord> SessionDataSet::constructRowRecordFromValueArray() {
std::vector<Field> outFields;
- for (int i = iotdbRpcDataSet_->getValueColumnStartIndex(); i <
iotdbRpcDataSet_->getColumnSize();
- i++) {
+ const int32_t valueColumnStartIndex =
iotdbRpcDataSet_->getValueColumnStartIndex();
+ const int32_t columnSize = iotdbRpcDataSet_->getColumnSize();
+ outFields.reserve(columnSize - valueColumnStartIndex);
+ for (int32_t columnIndex = valueColumnStartIndex + 1; columnIndex <=
columnSize; ++columnIndex) {
Field field;
- std::string columnName = iotdbRpcDataSet_->getColumnNameList().at(i);
- if (!iotdbRpcDataSet_->isNullByColumnName(columnName)) {
- TSDataType::TSDataType dataType =
iotdbRpcDataSet_->getDataType(columnName);
+ if (!iotdbRpcDataSet_->isNullByIndex(columnIndex)) {
+ TSDataType::TSDataType dataType =
iotdbRpcDataSet_->getDataTypeByIndex(columnIndex);
field.dataType = dataType;
switch (dataType) {
case TSDataType::BOOLEAN:
- field.boolV = iotdbRpcDataSet_->getBoolean(columnName);
+ field.boolV = iotdbRpcDataSet_->getBooleanByIndex(columnIndex).value();
break;
case TSDataType::INT32:
- field.intV = iotdbRpcDataSet_->getInt(columnName);
+ field.intV = iotdbRpcDataSet_->getIntByIndex(columnIndex).value();
break;
case TSDataType::DATE:
- field.dateV = iotdbRpcDataSet_->getDate(columnName);
+ field.dateV = iotdbRpcDataSet_->getDateByIndex(columnIndex).value();
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
- field.longV = iotdbRpcDataSet_->getLong(columnName);
+ field.longV = iotdbRpcDataSet_->getLongByIndex(columnIndex).value();
break;
case TSDataType::FLOAT:
- field.floatV = iotdbRpcDataSet_->getFloat(columnName);
+ field.floatV = iotdbRpcDataSet_->getFloatByIndex(columnIndex).value();
break;
case TSDataType::DOUBLE:
- field.doubleV = iotdbRpcDataSet_->getDouble(columnName);
+ field.doubleV =
iotdbRpcDataSet_->getDoubleByIndex(columnIndex).value();
break;
case TSDataType::TEXT:
case TSDataType::BLOB:
case TSDataType::STRING:
case TSDataType::OBJECT:
- field.stringV =
iotdbRpcDataSet_->getBinary(columnName)->getStringValue();
+ field.stringV =
iotdbRpcDataSet_->getBinaryByIndex(columnIndex)->getStringValue();
break;
default:
throw UnSupportedDataTypeException("Data type %s is not supported." +
dataType);
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
index 885f5894411..46881ce5893 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
@@ -153,45 +153,42 @@ public class SessionDataSet implements ISessionDataSet {
}
private RowRecord constructRowRecordFromValueArray() throws
StatementExecutionException {
- List<Field> outFields = new ArrayList<>();
- for (int i = ioTDBRpcDataSet.getValueColumnStartIndex();
- i < ioTDBRpcDataSet.getColumnSize();
- i++) {
+ int valueColumnStartIndex = ioTDBRpcDataSet.getValueColumnStartIndex();
+ int columnSize = ioTDBRpcDataSet.getColumnSize();
+ List<Field> outFields = new ArrayList<>(columnSize -
valueColumnStartIndex);
+ for (int columnIndex = valueColumnStartIndex + 1; columnIndex <=
columnSize; columnIndex++) {
Field field;
-
- String columnName = ioTDBRpcDataSet.getColumnNameList().get(i);
-
- if (!ioTDBRpcDataSet.isNull(columnName)) {
- TSDataType dataType = ioTDBRpcDataSet.getDataType(columnName);
+ if (!ioTDBRpcDataSet.isNull(columnIndex)) {
+ TSDataType dataType = ioTDBRpcDataSet.getDataType(columnIndex);
field = new Field(dataType);
switch (dataType) {
case BOOLEAN:
- boolean booleanValue = ioTDBRpcDataSet.getBoolean(columnName);
+ boolean booleanValue = ioTDBRpcDataSet.getBoolean(columnIndex);
field.setBoolV(booleanValue);
break;
case INT32:
case DATE:
- int intValue = ioTDBRpcDataSet.getInt(columnName);
+ int intValue = ioTDBRpcDataSet.getInt(columnIndex);
field.setIntV(intValue);
break;
case INT64:
case TIMESTAMP:
- long longValue = ioTDBRpcDataSet.getLong(columnName);
+ long longValue = ioTDBRpcDataSet.getLong(columnIndex);
field.setLongV(longValue);
break;
case FLOAT:
- float floatValue = ioTDBRpcDataSet.getFloat(columnName);
+ float floatValue = ioTDBRpcDataSet.getFloat(columnIndex);
field.setFloatV(floatValue);
break;
case DOUBLE:
- double doubleValue = ioTDBRpcDataSet.getDouble(columnName);
+ double doubleValue = ioTDBRpcDataSet.getDouble(columnIndex);
field.setDoubleV(doubleValue);
break;
case TEXT:
case BLOB:
case STRING:
case OBJECT:
- field.setBinaryV(ioTDBRpcDataSet.getBinary(columnName));
+ field.setBinaryV(ioTDBRpcDataSet.getBinary(columnIndex));
break;
default:
throw new UnSupportedDataTypeException(
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
index 02b5cc12fb5..000ab5a2799 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java
@@ -128,20 +128,15 @@ public class IoTDBJDBCDataSet {
// deduplicate and map
if (columnNameIndex != null) {
- int deduplicatedColumnSize = (int)
columnNameIndex.values().stream().distinct().count();
- this.columnTypeDeduplicatedList = new
ArrayList<>(deduplicatedColumnSize);
- for (int i = 0; i < deduplicatedColumnSize; i++) {
- columnTypeDeduplicatedList.add(null);
- }
+ this.columnTypeDeduplicatedList =
+
initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex));
for (int i = 0; i < columnNameList.size(); i++) {
String name = columnNameList.get(i);
this.columnNameList.add(name);
this.columnTypeList.add(columnTypeList.get(i));
if (!columnOrdinalMap.containsKey(name)) {
int index = columnNameIndex.get(name);
- if (!columnOrdinalMap.containsValue(index + START_INDEX)) {
- columnTypeDeduplicatedList.set(index,
TSDataType.valueOf(columnTypeList.get(i)));
- }
+ setColumnTypeIfAbsent(columnTypeDeduplicatedList, index,
columnTypeList.get(i));
columnOrdinalMap.put(name, index + START_INDEX);
}
}
@@ -243,11 +238,8 @@ public class IoTDBJDBCDataSet {
// deduplicate and map
if (columnNameIndex != null) {
- int deduplicatedColumnSize = (int)
columnNameIndex.values().stream().distinct().count();
- this.columnTypeDeduplicatedList = new
ArrayList<>(deduplicatedColumnSize);
- for (int i = 0; i < deduplicatedColumnSize; i++) {
- columnTypeDeduplicatedList.add(null);
- }
+ this.columnTypeDeduplicatedList =
+
initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex));
for (int i = 0; i < columnNameList.size(); i++) {
String name = "";
if (sgList != null
@@ -263,9 +255,7 @@ public class IoTDBJDBCDataSet {
// "Time".equals(name) -> to allow the Time column appear in value
columns
if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) {
int index = columnNameIndex.get(name);
- if (!columnOrdinalMap.containsValue(index + START_INDEX)) {
- columnTypeDeduplicatedList.set(index,
TSDataType.valueOf(columnTypeList.get(i)));
- }
+ setColumnTypeIfAbsent(columnTypeDeduplicatedList, index,
columnTypeList.get(i));
columnOrdinalMap.put(name, index + START_INDEX);
}
}
@@ -321,6 +311,31 @@ public class IoTDBJDBCDataSet {
this.emptyResultSet = (queryDataSet == null ||
!queryDataSet.time.hasRemaining());
}
+ private static int getDeduplicatedColumnSize(Map<String, Integer>
columnNameIndex) {
+ int deduplicatedColumnSize = 0;
+ for (Integer index : columnNameIndex.values()) {
+ if (index != null && index + 1 > deduplicatedColumnSize) {
+ deduplicatedColumnSize = index + 1;
+ }
+ }
+ return deduplicatedColumnSize;
+ }
+
+ private static List<TSDataType> initDeduplicatedColumnTypes(int
deduplicatedColumnSize) {
+ List<TSDataType> columnTypes = new ArrayList<>(deduplicatedColumnSize);
+ for (int i = 0; i < deduplicatedColumnSize; i++) {
+ columnTypes.add(null);
+ }
+ return columnTypes;
+ }
+
+ private static void setColumnTypeIfAbsent(
+ List<TSDataType> columnTypeDeduplicatedList, int index, String
columnType) {
+ if (columnTypeDeduplicatedList.get(index) == null) {
+ columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnType));
+ }
+ }
+
public void close() throws StatementExecutionException, TException {
if (isClosed) {
return;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java
index 86698e45573..e42710d23e1 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java
@@ -36,6 +36,8 @@ import java.util.List;
/** Serializer for PreparedStatement parameters. */
public class PreparedParameterSerde {
+ private static final char[] HEX_DIGITS = "0123456789ABCDEF".toCharArray();
+
public static class DeserializedParam {
public final TSDataType type;
public final Object value;
@@ -168,10 +170,13 @@ public class PreparedParameterSerde {
/** Convert byte array to hexadecimal string representation. */
public static String bytesToHex(byte[] bytes) {
- StringBuilder sb = new StringBuilder(bytes.length * 2);
- for (byte b : bytes) {
- sb.append(String.format("%02X", b));
+ char[] chars = new char[bytes.length * 2];
+ for (int i = 0; i < bytes.length; i++) {
+ int value = bytes[i] & 0xFF;
+ int index = i * 2;
+ chars[index] = HEX_DIGITS[value >>> 4];
+ chars[index + 1] = HEX_DIGITS[value & 0x0F];
}
- return sb.toString();
+ return new String(chars);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index ffc897760be..b92df62f5ad 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -849,31 +849,12 @@ public class Session implements ISession {
TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
request.setPrefixPath(prefixPath);
request.setMeasurements(measurements);
-
request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- request.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
- if (measurementAliasList != null) {
- measurementAliasList =
- measurementAliasList.stream()
- .map(value -> value != null ? value : "")
- .collect(Collectors.toList());
- }
- request.setMeasurementAlias(measurementAliasList);
- if (tagsList != null) {
- tagsList =
- tagsList.stream()
- .map(value -> value != null ? value : new HashMap<String,
String>())
- .collect(Collectors.toList());
- }
- request.setTagsList(tagsList);
- if (attributesList != null) {
- attributesList =
- attributesList.stream()
- .map(value -> value != null ? value : new HashMap<String,
String>())
- .collect(Collectors.toList());
- }
- request.setAttributesList(attributesList);
+ request.setDataTypes(toDataTypeOrdinals(dataTypes));
+ request.setEncodings(toEncodingOrdinals(encodings));
+ request.setCompressors(toCompressionOrdinals(compressors));
+ request.setMeasurementAlias(replaceNullStrings(measurementAliasList));
+ request.setTagsList(replaceNullMaps(tagsList));
+ request.setAttributesList(replaceNullMaps(attributesList));
return request;
}
@@ -913,47 +894,14 @@ public class Session implements ISession {
TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
request.setPaths(paths);
-
- List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
- for (TSDataType dataType : dataTypes) {
- dataTypeOrdinals.add(dataType.ordinal());
- }
- request.setDataTypes(dataTypeOrdinals);
-
- List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
- for (TSEncoding encoding : encodings) {
- encodingOrdinals.add(encoding.ordinal());
- }
- request.setEncodings(encodingOrdinals);
-
- List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
- for (CompressionType compression : compressors) {
- compressionOrdinals.add((int) compression.serialize());
- }
- request.setCompressors(compressionOrdinals);
+ request.setDataTypes(toDataTypeOrdinals(dataTypes));
+ request.setEncodings(toEncodingOrdinals(encodings));
+ request.setCompressors(toCompressionOrdinals(compressors));
request.setPropsList(propsList);
- if (tagsList != null) {
- tagsList =
- tagsList.stream()
- .map(value -> value != null ? value : new HashMap<String,
String>())
- .collect(Collectors.toList());
- }
- request.setTagsList(tagsList);
- if (attributesList != null) {
- attributesList =
- attributesList.stream()
- .map(value -> value != null ? value : new HashMap<String,
String>())
- .collect(Collectors.toList());
- }
- request.setAttributesList(attributesList);
- if (measurementAliasList != null) {
- measurementAliasList =
- measurementAliasList.stream()
- .map(value -> value != null ? value : "")
- .collect(Collectors.toList());
- }
- request.setMeasurementAliasList(measurementAliasList);
+ request.setTagsList(replaceNullMaps(tagsList));
+ request.setAttributesList(replaceNullMaps(attributesList));
+ request.setMeasurementAliasList(replaceNullStrings(measurementAliasList));
return request;
}
@@ -1817,19 +1765,23 @@ public class Session implements ISession {
List<String> measurementsList,
List<TSDataType> types,
List<Object> valuesList) {
- Map<String, Object> nullMap = new HashMap<>();
+ Map<String, Object> nullMap = logger.isInfoEnabled() ? new HashMap<>() :
null;
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
- nullMap.put(measurementsList.get(i), valuesList.get(i));
+ if (nullMap != null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ }
valuesList.remove(i);
measurementsList.remove(i);
types.remove(i);
}
}
if (valuesList.isEmpty()) {
- logger.info(SessionMessages.ALL_VALUES_NULL, deviceId, nullMap);
+ if (nullMap != null) {
+ logger.info(SessionMessages.ALL_VALUES_NULL, deviceId, nullMap);
+ }
return true;
- } else {
+ } else if (nullMap != null) {
logger.info(SessionMessages.SOME_VALUES_NULL, deviceId, nullMap);
}
return false;
@@ -1865,18 +1817,22 @@ public class Session implements ISession {
*/
private boolean filterNullValueAndMeasurementWithStringType(
List<String> valuesList, String deviceId, List<String> measurementsList)
{
- Map<String, Object> nullMap = new HashMap<>();
+ Map<String, Object> nullMap = logger.isInfoEnabled() ? new HashMap<>() :
null;
for (int i = valuesList.size() - 1; i >= 0; i--) {
if (valuesList.get(i) == null) {
- nullMap.put(measurementsList.get(i), valuesList.get(i));
+ if (nullMap != null) {
+ nullMap.put(measurementsList.get(i), valuesList.get(i));
+ }
valuesList.remove(i);
measurementsList.remove(i);
}
}
if (valuesList.isEmpty()) {
- logger.info(SessionMessages.ALL_VALUES_NULL, deviceId, nullMap);
+ if (nullMap != null) {
+ logger.info(SessionMessages.ALL_VALUES_NULL, deviceId, nullMap);
+ }
return true;
- } else {
+ } else if (nullMap != null) {
logger.info(SessionMessages.SOME_VALUES_NULL, deviceId, nullMap);
}
return false;
@@ -2620,7 +2576,65 @@ public class Session implements ISession {
* @return ordered list
*/
private static <T> List<T> sortList(List<T> source, Integer[] index) {
- return Arrays.stream(index).map(source::get).collect(Collectors.toList());
+ List<T> sortedList = new ArrayList<>(index.length);
+ for (int position : index) {
+ sortedList.add(source.get(position));
+ }
+ return sortedList;
+ }
+
+ private static List<Integer> toDataTypeOrdinals(List<TSDataType> dataTypes) {
+ List<Integer> ordinals = new ArrayList<>(dataTypes.size());
+ for (TSDataType dataType : dataTypes) {
+ ordinals.add(dataType.ordinal());
+ }
+ return ordinals;
+ }
+
+ private static List<Integer> toEncodingOrdinals(List<TSEncoding> encodings) {
+ List<Integer> ordinals = new ArrayList<>(encodings.size());
+ for (TSEncoding encoding : encodings) {
+ ordinals.add(encoding.ordinal());
+ }
+ return ordinals;
+ }
+
+ private static List<Integer> toCompressionOrdinals(List<CompressionType>
compressors) {
+ List<Integer> ordinals = new ArrayList<>(compressors.size());
+ for (CompressionType compression : compressors) {
+ ordinals.add((int) compression.serialize());
+ }
+ return ordinals;
+ }
+
+ private static List<Byte> toEnumOrdinalsAsBytes(List<? extends Enum<?>>
enumValues) {
+ List<Byte> ordinals = new ArrayList<>(enumValues.size());
+ for (Enum<?> enumValue : enumValues) {
+ ordinals.add((byte) enumValue.ordinal());
+ }
+ return ordinals;
+ }
+
+ private static List<String> replaceNullStrings(List<String> values) {
+ if (values == null) {
+ return null;
+ }
+ List<String> replacedValues = new ArrayList<>(values.size());
+ for (String value : values) {
+ replacedValues.add(value != null ? value : "");
+ }
+ return replacedValues;
+ }
+
+ private static List<Map<String, String>> replaceNullMaps(List<Map<String,
String>> values) {
+ if (values == null) {
+ return null;
+ }
+ List<Map<String, String>> replacedValues = new ArrayList<>(values.size());
+ for (Map<String, String> value : values) {
+ replacedValues.add(value != null ? value : new HashMap<>());
+ }
+ return replacedValues;
}
private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -2818,10 +2832,7 @@ public class Session implements ISession {
} else {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false, false);
request.setWriteToTable(true);
- request.setColumnCategories(
- tablet.getColumnTypes().stream()
- .map(t -> (byte) t.ordinal())
- .collect(Collectors.toList()));
+
request.setColumnCategories(toEnumOrdinalsAsBytes(tablet.getColumnTypes()));
try {
getDefaultSessionConnection().insertTablet(request);
} catch (RedirectException ignored) {
@@ -2887,8 +2898,7 @@ public class Session implements ISession {
Tablet tablet = entry.getValue();
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false, false);
request.setWriteToTable(true);
- request.setColumnCategories(
- tablet.getColumnTypes().stream().map(t -> (byte)
t.ordinal()).collect(Collectors.toList()));
+
request.setColumnCategories(toEnumOrdinalsAsBytes(tablet.getColumnTypes()));
try {
connection.insertTablet(request);
} catch (RedirectException e) {
@@ -2930,9 +2940,7 @@ public class Session implements ISession {
TSInsertTabletReq request =
genTSInsertTabletReq(subTablet, false, false);
request.setWriteToTable(true);
request.setColumnCategories(
- subTablet.getColumnTypes().stream()
- .map(t -> (byte) t.ordinal())
- .collect(Collectors.toList()));
+ toEnumOrdinalsAsBytes(subTablet.getColumnTypes()));
InsertConsumer<TSInsertTabletReq> insertConsumer =
SessionConnection::insertTablet;
try {
@@ -3902,10 +3910,9 @@ public class Session implements ISession {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
-
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- req.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
+ req.setDataTypes(toDataTypeOrdinals(dataTypes));
+ req.setEncodings(toEncodingOrdinals(encodings));
+ req.setCompressors(toCompressionOrdinals(compressors));
req.setIsAligned(true);
getDefaultSessionConnection().appendSchemaTemplate(req);
}
@@ -3948,10 +3955,9 @@ public class Session implements ISession {
TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
req.setName(templateName);
req.setMeasurements(measurementsPath);
-
req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
-
req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
- req.setCompressors(
- compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
+ req.setDataTypes(toDataTypeOrdinals(dataTypes));
+ req.setEncodings(toEncodingOrdinals(encodings));
+ req.setCompressors(toCompressionOrdinals(compressors));
req.setIsAligned(false);
getDefaultSessionConnection().appendSchemaTemplate(req);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index f49d6707377..8e40298e115 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -486,7 +486,7 @@ public class TableDeviceSchemaCache {
},
cachedDeviceID -> {
try {
- return new PartialPath(cachedDeviceID).matchFullPath(devicePath);
+ return devicePath.matchFullPath(cachedDeviceID);
} catch (final IllegalPathException e) {
logger.warn(
"Illegal deviceID {} found in cache when invalidating by
path {}, invalidate it anyway",
@@ -525,8 +525,8 @@ public class TableDeviceSchemaCache {
cachedDeviceID -> {
try {
return isMultiLevelWildcardMeasurement
- ? devicePath.matchPrefixPath(new PartialPath(cachedDeviceID))
- : devicePath.matchFullPath(new PartialPath(cachedDeviceID));
+ ? devicePath.matchPrefixPath(cachedDeviceID)
+ : devicePath.matchFullPath(cachedDeviceID);
} catch (final IllegalPathException e) {
logger.warn(
"Illegal deviceID {} found in cache when invalidating by
path {}, invalidate it anyway",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index b80333fddf5..a52330073d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -59,6 +59,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -371,11 +372,13 @@ public class SchemaEngine {
public Map<Integer, Long> countDeviceNumBySchemaRegion(final List<Integer>
schemaIds) {
final Map<Integer, Long> deviceNum = new HashMap<>();
+ final Collection<Integer> targetSchemaIds =
+ schemaIds.size() > 1 ? new HashSet<>(schemaIds) : schemaIds;
schemaRegionMap.entrySet().stream()
.filter(
entry ->
- schemaIds.contains(entry.getKey().getId())
+ targetSchemaIds.contains(entry.getKey().getId())
&&
SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()))
.forEach(
entry ->
@@ -387,10 +390,12 @@ public class SchemaEngine {
public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(final
List<Integer> schemaIds) {
final Map<Integer, Long> timeSeriesNum = new HashMap<>();
+ final Collection<Integer> targetSchemaIds =
+ schemaIds.size() > 1 ? new HashSet<>(schemaIds) : schemaIds;
schemaRegionMap.entrySet().stream()
.filter(
entry ->
- schemaIds.contains(entry.getKey().getId())
+ targetSchemaIds.contains(entry.getKey().getId())
&&
SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey())
&& !entry
.getValue()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 07308a95f2c..db51775281d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -99,7 +99,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1116,12 +1118,14 @@ public class StorageEngine implements IService {
public void getDiskSizeByDataRegion(
Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
- dataRegionMap.forEach(
- (dataRegionId, dataRegion) -> {
- if (dataRegionIds.contains(dataRegionId.getId())) {
- dataRegionDisk.put(dataRegionId.getId(),
dataRegion.countRegionDiskSize());
- }
- });
+ final Collection<Integer> targetDataRegionIds =
+ dataRegionIds.size() > 1 ? new HashSet<>(dataRegionIds) :
dataRegionIds;
+ for (Integer dataRegionId : targetDataRegionIds) {
+ final DataRegion dataRegion = dataRegionMap.get(new
DataRegionId(dataRegionId));
+ if (dataRegion != null) {
+ dataRegionDisk.put(dataRegionId, dataRegion.countRegionDiskSize());
+ }
+ }
}
public static File getDataRegionSystemDir(String dataBaseName, String
dataRegionId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
index 6ebb6f7073b..8dd2fb100aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
@@ -51,11 +51,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class ClosedFileScanHandleImpl implements IFileScanHandle {
@@ -63,13 +63,16 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
private final TsFileResource tsFileResource;
private final QueryContext queryContext;
private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
curFileModEntries = null;
+ // Used to cache the device-level modifications
+ private final Map<IDeviceID, List<TimeRange>> deviceToDeletionRanges;
// Used to cache the modifications of each timeseries
private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToModifications;
public ClosedFileScanHandleImpl(TsFileResource tsFileResource, QueryContext
context) {
this.tsFileResource = tsFileResource;
this.queryContext = context;
- this.deviceToModifications = new HashMap<>();
+ this.deviceToDeletionRanges = new ConcurrentHashMap<>();
+ this.deviceToModifications = new ConcurrentHashMap<>();
}
@Override
@@ -87,10 +90,24 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
curFileModEntries != null
? curFileModEntries
: queryContext.loadAllModificationsFromDisk(tsFileResource);
- List<ModEntry> modifications =
queryContext.getPathModifications(curFileModEntries, deviceID);
- List<TimeRange> timeRangeList =
-
modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList());
- return ModificationUtils.isPointDeletedWithoutOrderedRange(timestamp,
timeRangeList);
+ List<TimeRange> timeRangeList = deviceToDeletionRanges.get(deviceID);
+ if (timeRangeList == null) {
+ final List<TimeRange> computedTimeRangeList =
+
getMergedTimeRanges(queryContext.getPathModifications(curFileModEntries,
deviceID));
+ final List<TimeRange> existingTimeRangeList =
+ deviceToDeletionRanges.putIfAbsent(deviceID, computedTimeRangeList);
+ timeRangeList = existingTimeRangeList == null ? computedTimeRangeList :
existingTimeRangeList;
+ }
+ return ModificationUtils.isPointDeleted(timestamp, timeRangeList);
+ }
+
+ private static List<TimeRange> getMergedTimeRanges(List<ModEntry>
modifications) {
+ List<TimeRange> timeRangeList = new ArrayList<>(modifications.size());
+ for (ModEntry modification : modifications) {
+ timeRangeList.add(modification.getTimeRange());
+ }
+ TimeRange.sortAndMerge(timeRangeList);
+ return timeRangeList;
}
@Override
@@ -107,11 +124,9 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
List<ModEntry> modifications =
queryContext.getPathModifications(curFileModEntries, deviceID,
timeSeriesName);
- List<TimeRange> timeRangeList =
-
modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList());
- TimeRange.sortAndMerge(timeRangeList);
+ List<TimeRange> timeRangeList = getMergedTimeRanges(modifications);
deviceToModifications
- .computeIfAbsent(deviceID, k -> new HashMap<>())
+ .computeIfAbsent(deviceID, k -> new ConcurrentHashMap<>())
.put(timeSeriesName, timeRangeList);
return ModificationUtils.isPointDeleted(timestamp, timeRangeList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
index ba03f69a11a..4c97073e945 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator;
+import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -39,15 +40,19 @@ import org.apache.tsfile.read.common.TimeRange;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class UnclosedFileScanHandleImpl implements IFileScanHandle {
private final TsFileResource tsFileResource;
private final Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataMap;
private final Map<IDeviceID, Map<String, List<IChunkHandle>>>
deviceToMemChunkHandleMap;
+ private final Map<IDeviceID, List<TimeRange>> deviceToDeletionRanges;
+ private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToTimeSeriesDeletionRanges;
public UnclosedFileScanHandleImpl(
Map<IDeviceID, Map<String, List<IChunkMetadata>>>
deviceToChunkMetadataMap,
@@ -56,6 +61,8 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
this.deviceToChunkMetadataMap = deviceToChunkMetadataMap;
this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap;
this.tsFileResource = tsFileResource;
+ this.deviceToDeletionRanges = new ConcurrentHashMap<>();
+ this.deviceToTimeSeriesDeletionRanges = new ConcurrentHashMap<>();
}
@Override
@@ -68,19 +75,9 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timeArray) {
- Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
- for (List<IChunkMetadata> chunkMetadataList : chunkMetadataMap.values()) {
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (chunkMetadata.getDeleteIntervalList() != null) {
- for (TimeRange deleteInterval :
chunkMetadata.getDeleteIntervalList()) {
- if (deleteInterval.contains(timeArray)) {
- return true;
- }
- }
- }
- }
- }
- return false;
+ List<TimeRange> deletionRanges =
+ deviceToDeletionRanges.computeIfAbsent(deviceID,
this::collectDeviceDeletionRanges);
+ return ModificationUtils.isPointDeleted(timeArray, deletionRanges);
}
@Override
@@ -121,19 +118,13 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isTimeSeriesTimeDeleted(
IDeviceID deviceID, String timeSeriesName, long timestamp) {
- List<IChunkMetadata> chunkMetadataList =
- deviceToChunkMetadataMap.get(deviceID).get(timeSeriesName);
- // check if timestamp is deleted by deleteInterval
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (chunkMetadata.getDeleteIntervalList() != null) {
- for (TimeRange deleteInterval : chunkMetadata.getDeleteIntervalList())
{
- if (deleteInterval.contains(timestamp)) {
- return true;
- }
- }
- }
- }
- return false;
+ Map<String, List<TimeRange>> timeSeriesDeletionRanges =
+ deviceToTimeSeriesDeletionRanges.computeIfAbsent(
+ deviceID, key -> new ConcurrentHashMap<>());
+ List<TimeRange> deletionRanges =
+ timeSeriesDeletionRanges.computeIfAbsent(
+ timeSeriesName, key -> collectTimeSeriesDeletionRanges(deviceID,
key));
+ return ModificationUtils.isPointDeleted(timestamp, deletionRanges);
}
@Override
@@ -167,4 +158,44 @@ public class UnclosedFileScanHandleImpl implements
IFileScanHandle {
public TsFileResource getTsResource() {
return tsFileResource;
}
+
+ private List<TimeRange> collectDeviceDeletionRanges(IDeviceID deviceID) {
+ Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
+ if (chunkMetadataMap == null || chunkMetadataMap.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<TimeRange> deletionRanges = new ArrayList<>();
+ for (List<IChunkMetadata> chunkMetadataList : chunkMetadataMap.values()) {
+ appendDeletionRanges(deletionRanges, chunkMetadataList);
+ }
+ TimeRange.sortAndMerge(deletionRanges);
+ return deletionRanges;
+ }
+
+ private List<TimeRange> collectTimeSeriesDeletionRanges(
+ IDeviceID deviceID, String timeSeriesName) {
+ Map<String, List<IChunkMetadata>> chunkMetadataMap =
deviceToChunkMetadataMap.get(deviceID);
+ if (chunkMetadataMap == null) {
+ return Collections.emptyList();
+ }
+ List<IChunkMetadata> chunkMetadataList =
chunkMetadataMap.get(timeSeriesName);
+ if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<TimeRange> deletionRanges = new ArrayList<>();
+ appendDeletionRanges(deletionRanges, chunkMetadataList);
+ TimeRange.sortAndMerge(deletionRanges);
+ return deletionRanges;
+ }
+
+ private void appendDeletionRanges(
+ List<TimeRange> deletionRanges, List<IChunkMetadata> chunkMetadataList) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (chunkMetadata.getDeleteIntervalList() != null) {
+ for (TimeRange deletionRange : chunkMetadata.getDeleteIntervalList()) {
+ deletionRanges.add(new TimeRange(deletionRange.getMin(),
deletionRange.getMax()));
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 86890370c31..b2cb7886010 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -206,14 +206,7 @@ public class TsFileManager {
public void remove(TsFileResource tsFileResource, boolean sequence) {
writeLock("remove");
try {
- Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles :
unsequenceFiles;
- for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet())
{
- if (entry.getValue().contains(tsFileResource)) {
- entry.getValue().remove(tsFileResource);
-
TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
- break;
- }
- }
+ removeFromPartitionFileList(tsFileResource, sequence);
} finally {
writeUnlock();
}
@@ -223,10 +216,18 @@ public class TsFileManager {
writeLock("removeAll");
try {
for (TsFileResource resource : tsFileResourceList) {
- remove(resource, sequence);
+ removeFromPartitionFileList(resource, sequence);
}
} finally {
- writeLock("removeAll");
+ writeUnlock();
+ }
+ }
+
+ private void removeFromPartitionFileList(TsFileResource tsFileResource,
boolean sequence) {
+ Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles :
unsequenceFiles;
+ TsFileResourceList tsFileResources =
selectedMap.get(tsFileResource.getTimePartition());
+ if (tsFileResources != null && tsFileResources.remove(tsFileResource)) {
+ TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
index 972c4c9ad0c..a0ceb57de9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java
@@ -469,7 +469,7 @@ public class ArrayDeviceTimeIndex implements ITimeIndex {
endTime = endTimes[entry.getValue()];
}
} else {
- if (devicePattern.matchFullPath(new PartialPath(entry.getKey()))) {
+ if (devicePattern.matchFullPath(entry.getKey())) {
deviceMatchInfo.add(entry.getKey());
hasMatchedDevice = true;
if (startTimes[entry.getValue()] < startTime) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 0642bbcd839..0ea1abee866 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -82,7 +82,11 @@ public class ModificationUtils {
if (range.contains(metaData.getStartTime(),
metaData.getEndTime())) {
return true;
} else {
- if (range.overlaps(new TimeRange(metaData.getStartTime(),
metaData.getEndTime()))) {
+ if (overlap(
+ metaData.getStartTime(),
+ metaData.getEndTime(),
+ range.getMin(),
+ range.getMax())) {
metaData.setModified(true);
}
}
@@ -144,9 +148,11 @@ public class ModificationUtils {
currentRemoved = true;
break;
} else {
- if (range.overlaps(
- new TimeRange(
- valueChunkMetadata.getStartTime(),
valueChunkMetadata.getEndTime()))) {
+ if (overlap(
+ valueChunkMetadata.getStartTime(),
+ valueChunkMetadata.getEndTime(),
+ range.getMin(),
+ range.getMax())) {
valueChunkMetadata.setModified(true);
modified = true;
}
@@ -194,10 +200,11 @@ public class ModificationUtils {
// all rows are deleted
return true;
} else {
- if (range.overlaps(
- new TimeRange(
- timeColumnChunkMetadata.getStartTime(),
- timeColumnChunkMetadata.getEndTime()))) {
+ if (overlap(
+ timeColumnChunkMetadata.getStartTime(),
+ timeColumnChunkMetadata.getEndTime(),
+ range.getMin(),
+ range.getMax())) {
timeColumnChunkMetadata.setModified(true);
modified = true;
}
@@ -329,16 +336,17 @@ public class ModificationUtils {
if (measurementList.isEmpty()) {
return Collections.emptyList();
}
- List<ModEntry> modifications =
- ModificationUtils.getModificationsForMemtable(memTable,
modsToMemtable);
- List<List<TimeRange>> deletionList = new ArrayList<>();
+ List<ModEntry> deviceModifications =
+ filterDeviceModifications(
+ deviceID,
+ ModificationUtils.getModificationsForMemtable(memTable,
modsToMemtable),
+ timeLowerBound);
+ List<List<TimeRange>> deletionList = new
ArrayList<>(measurementList.size());
for (String measurement : measurementList) {
List<TimeRange> columnDeletionList = new ArrayList<>();
columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
- for (ModEntry modification : modifications) {
- if (modification.affects(deviceID)
- && modification.affects(measurement)
- && modification.getEndTime() > timeLowerBound) {
+ for (ModEntry modification : deviceModifications) {
+ if (modification.affects(measurement)) {
long lowerBound = Math.max(modification.getStartTime(),
timeLowerBound);
columnDeletionList.add(new TimeRange(lowerBound,
modification.getEndTime()));
}
@@ -362,10 +370,10 @@ public class ModificationUtils {
long timeLowerBound) {
List<TimeRange> deletionList = new ArrayList<>();
deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
- for (ModEntry modification : getModificationsForMemtable(memTable,
modsToMemtable)) {
- if (modification.affects(deviceID)
- && modification.affects(measurement)
- && modification.getEndTime() > timeLowerBound) {
+ for (ModEntry modification :
+ filterDeviceModifications(
+ deviceID, getModificationsForMemtable(memTable, modsToMemtable),
timeLowerBound)) {
+ if (modification.affects(measurement)) {
long lowerBound = Math.max(modification.getStartTime(),
timeLowerBound);
deletionList.add(new TimeRange(lowerBound, modification.getEndTime()));
}
@@ -386,6 +394,17 @@ public class ModificationUtils {
return modifications;
}
+ private static List<ModEntry> filterDeviceModifications(
+ IDeviceID deviceID, List<ModEntry> modifications, long timeLowerBound) {
+ List<ModEntry> deviceModifications = new ArrayList<>();
+ for (ModEntry modification : modifications) {
+ if (modification.affects(deviceID) && modification.getEndTime() >
timeLowerBound) {
+ deviceModifications.add(modification);
+ }
+ }
+ return deviceModifications;
+ }
+
public static boolean canMerge(TimeRange left, TimeRange right) {
// [1,3] can merge with [4, 5]
// [1,3] cannot merge with [5,6]
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6220623dd3b..816be10afb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -32,15 +32,14 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.write.UnSupportedDataTypeException;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -65,14 +64,13 @@ public class QueryDataSetUtils {
// indicate whether it is a null
int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new
DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new
ByteArrayOutputStream[columnNumWithTime];
+ PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ byteArrayOutputStreams[i] = new PublicBAOS();
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
int rowCount = 0;
- int[] valueOccupation = new int[columnNum];
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
@@ -85,14 +83,7 @@ public class QueryDataSetUtils {
TsBlock tsBlock = optionalTsBlock.get();
if (!tsBlock.isEmpty()) {
int currentCount = tsBlock.getPositionCount();
- serializeTsBlock(
- rowCount,
- currentCount,
- tsBlock,
- columnNum,
- dataOutputStreams,
- valueOccupation,
- bitmaps);
+ serializeTsBlock(rowCount, currentCount, tsBlock, columnNum,
dataOutputStreams, bitmaps);
rowCount += currentCount;
}
}
@@ -101,9 +92,9 @@ public class QueryDataSetUtils {
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
- fillTimeColumn(rowCount, byteArrayOutputStreams, tsQueryDataSet);
+ fillTimeColumn(byteArrayOutputStreams, tsQueryDataSet);
- fillValueColumnsAndBitMaps(rowCount, byteArrayOutputStreams,
valueOccupation, tsQueryDataSet);
+ fillValueColumnsAndBitMaps(byteArrayOutputStreams, tsQueryDataSet);
return new Pair<>(tsQueryDataSet, finished);
}
@@ -117,14 +108,13 @@ public class QueryDataSetUtils {
int columnNum = 1;
int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new
DataOutputStream[columnNumWithTime];
- ByteArrayOutputStream[] byteArrayOutputStreams = new
ByteArrayOutputStream[columnNumWithTime];
+ PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
- byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ byteArrayOutputStreams[i] = new PublicBAOS();
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
int rowCount = 0;
- int[] valueOccupation = new int[columnNum];
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
@@ -158,7 +148,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -176,7 +165,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -193,7 +181,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -210,7 +197,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -227,7 +213,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[k] += 1;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -249,7 +234,6 @@ public class QueryDataSetUtils {
Binary binary = column.getBinary(i);
dataOutputStream.writeInt(binary.getLength());
dataOutputStream.write(binary.getValues());
- valueOccupation[k] = valueOccupation[k] + 4 +
binary.getLength();
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
@@ -277,27 +261,14 @@ public class QueryDataSetUtils {
}
// calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
+ tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0]));
// calculate the bitmap buffer size
- int bitmapOccupation = (rowCount + 7) / 8;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
+ List<ByteBuffer> bitmapList = new ArrayList<>(columnNum);
+ List<ByteBuffer> valueList = new ArrayList<>(columnNum);
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
+ valueList.add(wrapBuffer(byteArrayOutputStreams[i]));
+ bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1]));
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
@@ -310,7 +281,6 @@ public class QueryDataSetUtils {
TsBlock tsBlock,
int columnNum,
DataOutputStream[] dataOutputStreams,
- int[] valueOccupation,
int[] bitmaps)
throws IOException {
// serialize time column
@@ -330,68 +300,28 @@ public class QueryDataSetUtils {
switch (type) {
case INT32:
case DATE:
- doWithInt32Column(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithInt32Column(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case INT64:
case TIMESTAMP:
- doWithInt64Column(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithInt64Column(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case FLOAT:
- doWithFloatColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithFloatColumn(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case DOUBLE:
doWithDoubleColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case BOOLEAN:
doWithBooleanColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
case TEXT:
case BLOB:
case STRING:
case OBJECT:
- doWithTextColumn(
- rowCount,
- column,
- bitmaps,
- k,
- dataOutputStream,
- valueOccupation,
- dataBitmapOutputStream);
+ doWithTextColumn(rowCount, column, bitmaps, k, dataOutputStream,
dataBitmapOutputStream);
break;
default:
throw new UnSupportedDataTypeException(
@@ -406,7 +336,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -416,7 +345,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[columnIndex] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -432,7 +360,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -442,7 +369,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[columnIndex] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -458,7 +384,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -468,7 +393,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[columnIndex] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -484,7 +408,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -494,7 +417,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[columnIndex] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -510,7 +432,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -520,7 +441,6 @@ public class QueryDataSetUtils {
} else {
bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[columnIndex] += 1;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -536,7 +456,6 @@ public class QueryDataSetUtils {
int[] bitmaps,
int columnIndex,
DataOutputStream dataOutputStream,
- int[] valueOccupation,
DataOutputStream dataBitmapOutputStream)
throws IOException {
for (int i = 0, size = column.getPositionCount(); i < size; i++) {
@@ -548,7 +467,6 @@ public class QueryDataSetUtils {
Binary binary = column.getBinary(i);
dataOutputStream.writeInt(binary.getLength());
dataOutputStream.write(binary.getValues());
- valueOccupation[columnIndex] = valueOccupation[columnIndex] + 4 +
binary.getLength();
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
@@ -572,40 +490,27 @@ public class QueryDataSetUtils {
}
private static void fillTimeColumn(
- int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams,
TSQueryDataSet tsQueryDataSet) {
- // calculate the time buffer size
- int timeOccupation = rowCount * 8;
- ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
- timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
- timeBuffer.flip();
- tsQueryDataSet.setTime(timeBuffer);
+ PublicBAOS[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) {
+ tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0]));
}
private static void fillValueColumnsAndBitMaps(
- int rowCount,
- ByteArrayOutputStream[] byteArrayOutputStreams,
- int[] valueOccupation,
- TSQueryDataSet tsQueryDataSet) {
- // calculate the bitmap buffer size
- int bitmapOccupation = (rowCount + 7) / 8;
-
- List<ByteBuffer> bitmapList = new LinkedList<>();
- List<ByteBuffer> valueList = new LinkedList<>();
+ PublicBAOS[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) {
+ int columnNum = byteArrayOutputStreams.length / 2;
+ List<ByteBuffer> bitmapList = new ArrayList<>(columnNum);
+ List<ByteBuffer> valueList = new ArrayList<>(columnNum);
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
- valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
- valueBuffer.flip();
- valueList.add(valueBuffer);
-
- ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
- bitmapBuffer.flip();
- bitmapList.add(bitmapBuffer);
+ valueList.add(wrapBuffer(byteArrayOutputStreams[i]));
+ bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1]));
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
}
+ private static ByteBuffer wrapBuffer(PublicBAOS outputStream) {
+ return ByteBuffer.wrap(outputStream.getBuf(), 0, outputStream.size());
+ }
+
/**
* To fetch required amounts of data and combine them through List
*
@@ -668,9 +573,7 @@ public class QueryDataSetUtils {
boolean hasBitMap = BytesUtils.byteToBool(buffer.get());
if (hasBitMap) {
byte[] bytes = new byte[BitMap.getSizeOfBytes(size)];
- for (int j = 0; j < bytes.length; j++) {
- bytes[j] = buffer.get();
- }
+ buffer.get(bytes);
bitMaps[i] = new BitMap(size, bytes);
}
}
@@ -687,9 +590,7 @@ public class QueryDataSetUtils {
boolean hasBitMap = BytesUtils.byteToBool(stream.readByte());
if (hasBitMap) {
byte[] bytes = new byte[BitMap.getSizeOfBytes(size)];
- for (int j = 0; j < bytes.length; j++) {
- bytes[j] = stream.readByte();
- }
+ stream.readFully(bytes);
bitMaps[i] = new BitMap(size, bytes);
}
}
@@ -865,11 +766,7 @@ public class QueryDataSetUtils {
for (int index = 0; index < size; index++) {
int binarySize = stream.readInt();
byte[] binaryValue = new byte[binarySize];
- int actualReadSize = stream.read(binaryValue);
- if (actualReadSize != binarySize) {
- throw new IllegalStateException(
- "Expect to read " + binarySize + " bytes, actually read " +
actualReadSize + "bytes.");
- }
+ stream.readFully(binaryValue);
binaryValues[index] = new Binary(binaryValue);
}
values[columnIndex] = binaryValues;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
index be33498eb63..cd5ca74661e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java
@@ -272,6 +272,55 @@ public class TreeDeviceSchemaCacheManagerTest {
Assert.assertEquals(0,
TableDeviceSchemaCache.getInstance().getMemoryUsage());
}
+ @Test
+ public void testInvalidateLastCacheByWildcardDevicePath() throws
IllegalPathException {
+ final MeasurementSchema s0 = new MeasurementSchema("s0", TSDataType.INT32);
+ final PartialPath device0 = new PartialPath("root.sg1.d1");
+ final PartialPath device1 = new PartialPath("root.sg2.d1");
+ final MeasurementPath path0 = new
MeasurementPath(device0.concatNode("s0"), s0);
+ final MeasurementPath path1 = new
MeasurementPath(device1.concatNode("s0"), s0);
+ final TimeValuePair tv0 = new TimeValuePair(0L, new
TsPrimitiveType.TsInt(0));
+
+ updateLastCache("root.sg1", device0, path0, s0, tv0);
+ updateLastCache("root.sg2", device1, path1, s0, tv0);
+
+ Assert.assertEquals(
+ tv0,
+ treeDeviceSchemaCacheManager.getLastCache(
+ new MeasurementPath(device0.getIDeviceID(), "s0")));
+ Assert.assertEquals(
+ tv0,
+ treeDeviceSchemaCacheManager.getLastCache(
+ new MeasurementPath(device1.getIDeviceID(), "s0")));
+
+ treeDeviceSchemaCacheManager.invalidateLastCache(new
MeasurementPath("root.sg1.*.s0"));
+
+ Assert.assertNull(
+ treeDeviceSchemaCacheManager.getLastCache(
+ new MeasurementPath(device0.getIDeviceID(), "s0")));
+ Assert.assertEquals(
+ tv0,
+ treeDeviceSchemaCacheManager.getLastCache(
+ new MeasurementPath(device1.getIDeviceID(), "s0")));
+ }
+
+ private void updateLastCache(
+ final String database,
+ final PartialPath devicePath,
+ final MeasurementPath path,
+ final MeasurementSchema schema,
+ final TimeValuePair timeValuePair) {
+ treeDeviceSchemaCacheManager.declareLastCache(database, path);
+ treeDeviceSchemaCacheManager.updateLastCacheIfExists(
+ database,
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ StringArrayDeviceID.splitDeviceIdString(devicePath.getNodes())),
+ new String[] {path.getMeasurement()},
+ new TimeValuePair[] {timeValuePair},
+ false,
+ new MeasurementSchema[] {schema});
+ }
+
@Test
public void testPut() throws Exception {
final ClusterSchemaTree clusterSchemaTree = new ClusterSchemaTree();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
index 3f79efa0e53..a78933d7cd0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
@@ -164,6 +164,9 @@ public class LimitOperatorTest {
int count = 0;
while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
TsBlock tsBlock = limitOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index bbeeabced19..1c10c3e02c4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -426,16 +426,19 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return matchPath(rPath.getNodes(), 0, 0, false, false);
}
+ public boolean matchFullPath(IDeviceID deviceID) throws IllegalPathException
{
+ return matchPath(getDeviceNodes(deviceID), 0, 0, false, false);
+ }
+
public boolean matchFullPath(IDeviceID deviceID, String measurement) {
- // TODO change this way
- PartialPath devicePath;
try {
- devicePath = new PartialPath(deviceID.toString());
+ String[] deviceNodes = getDeviceNodes(deviceID);
+ String[] fullPathNodes = Arrays.copyOf(deviceNodes, deviceNodes.length +
1);
+ fullPathNodes[deviceNodes.length] = measurement;
+ return matchPath(fullPathNodes, 0, 0, false, false);
} catch (IllegalPathException e) {
throw new RuntimeException(e);
}
- return matchPath(
- devicePath.concatAsMeasurementPath(measurement).getNodes(), 0, 0,
false, false);
}
/**
@@ -472,6 +475,21 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return matchPath(prefixPath.getNodes(), 0, 0, false, true);
}
+ public boolean matchPrefixPath(IDeviceID deviceID) throws
IllegalPathException {
+ return matchPath(getDeviceNodes(deviceID), 0, 0, false, true);
+ }
+
+ private static String[] getDeviceNodes(IDeviceID deviceID) throws
IllegalPathException {
+ String[] tableNameSegments =
PathUtils.splitPathToDetachedNodes(deviceID.getTableName());
+ String[] deviceNodes = new String[deviceID.segmentNum() - 1 +
tableNameSegments.length];
+ System.arraycopy(tableNameSegments, 0, deviceNodes, 0,
tableNameSegments.length);
+ for (int i = 0; i < deviceID.segmentNum() - 1; i++) {
+ deviceNodes[i + tableNameSegments.length] =
+ deviceID.segment(i + 1) != null ? deviceID.segment(i + 1).toString()
: null;
+ }
+ return deviceNodes;
+ }
+
private boolean matchPath(
String[] pathNodes,
int pathIndex,