This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch table-model-debug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c407ba84a6940922391b312157ec8b64dc69f50 Author: MarcosZyk <[email protected]> AuthorDate: Sun Apr 28 12:02:03 2024 +0800 implement ser deser --- .../db/queryengine/common/header/ColumnHeader.java | 12 ++++ .../plan/planner/plan/node/PlanNodeType.java | 4 ++ .../node/metedata/read/TableDeviceScanNode.java | 79 +++++++++++++++++++++- .../utils/filter/DeviceFilterVisitor.java | 9 ++- .../iotdb/commons/schema/filter/SchemaFilter.java | 9 +++ .../schema/filter/impl/DeviceAttributeFilter.java | 17 ++++- .../commons/schema/filter/impl/DeviceIdFilter.java | 17 ++++- 7 files changed, 139 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java index a2589c6bb19..2ae8d8ca234 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeader.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.common.header; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; @@ -72,6 +74,16 @@ public class ColumnHeader { dataType.serializeTo(byteBuffer); } + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(columnName, stream); + ReadWriteIOUtils.write(dataType.ordinal(), stream); + ReadWriteIOUtils.write(hasAlias(), stream); + if (hasAlias()) { + ReadWriteIOUtils.write(alias, stream); + } + dataType.serializeTo(stream); + } + public static ColumnHeader deserialize(ByteBuffer byteBuffer) { String columnName = ReadWriteIOUtils.readString(byteBuffer); TSDataType dataType = TSDataType.values()[ReadWriteIOUtils.readInt(byteBuffer)]; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 009e27c6a42..24a45af766d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode; @@ -213,6 +214,7 @@ public enum PlanNodeType { PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91), CREATE_TABLE_DEVICE((short) 92), + TABLE_DEVICE_SCAN((short) 93), ; public static final int BYTES = Short.BYTES; @@ -446,6 +448,8 @@ public enum PlanNodeType { return PipeOperateSchemaQueueNode.deserialize(buffer); case 92: return CreateTableDeviceNode.deserialize(buffer); + case 93: + return TableDeviceScanNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java index 3605963c6bc..15570dfe9fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceScanNode.java @@ -5,11 +5,15 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -103,10 +107,81 @@ public class TableDeviceScanNode extends SchemaQueryScanNode { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_DEVICE_SCAN.serialize(byteBuffer); + ReadWriteIOUtils.write(database, byteBuffer); + ReadWriteIOUtils.write(tableName, byteBuffer); + + ReadWriteIOUtils.write(idDeterminedFilterList.size(), byteBuffer); + for (SchemaFilter schemaFilter : idDeterminedFilterList) { + SchemaFilter.serialize(schemaFilter, byteBuffer); + } + + ReadWriteIOUtils.write(idFuzzyFilterList.size(), byteBuffer); + for (SchemaFilter schemaFilter : idFuzzyFilterList) { + SchemaFilter.serialize(schemaFilter, byteBuffer); + } + + ReadWriteIOUtils.write(columnHeaderList.size(), byteBuffer); + for (ColumnHeader columnHeader : columnHeaderList) { + columnHeader.serialize(byteBuffer); + } + } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException {} + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_DEVICE_SCAN.serialize(stream); + ReadWriteIOUtils.write(database, stream); + ReadWriteIOUtils.write(tableName, stream); + + ReadWriteIOUtils.write(idDeterminedFilterList.size(), stream); + for (SchemaFilter schemaFilter : idDeterminedFilterList) { + SchemaFilter.serialize(schemaFilter, stream); + } + + ReadWriteIOUtils.write(idFuzzyFilterList.size(), stream); + for (SchemaFilter schemaFilter : idFuzzyFilterList) { + SchemaFilter.serialize(schemaFilter, stream); + } + + ReadWriteIOUtils.write(columnHeaderList.size(), stream); + for (ColumnHeader columnHeader : columnHeaderList) { + columnHeader.serialize(stream); + } + } + + public static TableDeviceScanNode deserialize(ByteBuffer buffer) { + String database = ReadWriteIOUtils.readString(buffer); + String tableName = ReadWriteIOUtils.readString(buffer); + + int size = ReadWriteIOUtils.readInt(buffer); + List<SchemaFilter> idDeterminedFilterList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + idDeterminedFilterList.add(SchemaFilter.deserialize(buffer)); + } + + size = ReadWriteIOUtils.readInt(buffer); + List<SchemaFilter> idFuzzyFilterList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + idFuzzyFilterList.add(SchemaFilter.deserialize(buffer)); + } + + size = ReadWriteIOUtils.readInt(buffer); + List<ColumnHeader> columnHeaderList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + columnHeaderList.add(ColumnHeader.deserialize(buffer)); + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + return new TableDeviceScanNode( + planNodeId, + database, + tableName, + idDeterminedFilterList, + idFuzzyFilterList, + columnHeaderList, + null); + } @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java index 0555aecf5ea..070b2b40c17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java @@ -64,11 +64,16 @@ public class DeviceFilterVisitor extends SchemaFilterVisitor<IDeviceSchemaInfo> @Override public boolean visitDeviceIdFilter(DeviceIdFilter filter, IDeviceSchemaInfo info) { - return info.getPartialPath().getNodes()[filter.getIndex() + 3].equals(filter.getValue()); + String[] nodes = info.getPartialPath().getNodes(); + if (nodes.length < filter.getIndex() + 3) { + return false; + } else { + return nodes[filter.getIndex() + 3].equals(filter.getValue()); + } } @Override public boolean visitDeviceAttributeFilter(DeviceAttributeFilter filter, IDeviceSchemaInfo info) { - return info.getAttributeValue(filter.getKey()).equals(filter.getValue()); + return filter.getValue().equals(info.getAttributeValue(filter.getKey())); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java index a9e5fea0b73..ad3c4679a39 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java @@ -21,6 +21,9 @@ package org.apache.iotdb.commons.schema.filter; import org.apache.iotdb.commons.schema.filter.impl.AndFilter; import org.apache.iotdb.commons.schema.filter.impl.DataTypeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter; +import org.apache.iotdb.commons.schema.filter.impl.OrFilter; import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter; import org.apache.iotdb.commons.schema.filter.impl.TagFilter; import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter; @@ -73,6 +76,12 @@ public abstract class SchemaFilter { return new AndFilter(byteBuffer); case TEMPLATE_FILTER: return new TemplateFilter(byteBuffer); + case OR: + return new OrFilter(byteBuffer); + case DEVICE_ID: + return new DeviceIdFilter(byteBuffer); + case DEVICE_ATTRIBUTE: + return new DeviceAttributeFilter(byteBuffer); default: throw new IllegalArgumentException("Unsupported schema filter type: " + type); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java index 09736bf5b19..d85b437e9c4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.filter.SchemaFilterType; import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,6 +39,11 @@ public class DeviceAttributeFilter extends SchemaFilter { this.value = value; } + public DeviceAttributeFilter(ByteBuffer byteBuffer) { + this.key = ReadWriteIOUtils.readString(byteBuffer); + this.value = ReadWriteIOUtils.readString(byteBuffer); + } + public String getKey() { return key; } @@ -56,8 +63,14 @@ public class DeviceAttributeFilter extends SchemaFilter { } @Override - public void serialize(ByteBuffer byteBuffer) {} + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(key, byteBuffer); + ReadWriteIOUtils.write(value, byteBuffer); + } @Override - public void serialize(DataOutputStream stream) throws IOException {} + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(key, stream); + ReadWriteIOUtils.write(value, stream); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java index eda64d90a8d..149766bdb95 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.filter.SchemaFilterType; import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,6 +39,11 @@ public class DeviceIdFilter extends SchemaFilter { this.value = value; } + public DeviceIdFilter(ByteBuffer byteBuffer) { + this.index = ReadWriteIOUtils.readInt(byteBuffer); + this.value = ReadWriteIOUtils.readString(byteBuffer); + } + public int getIndex() { return index; } @@ -56,8 +63,14 @@ public class DeviceIdFilter extends SchemaFilter { } @Override - public void serialize(ByteBuffer byteBuffer) {} + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(index, byteBuffer); + ReadWriteIOUtils.write(value, byteBuffer); + } @Override - public void serialize(DataOutputStream stream) throws IOException {} + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(index, stream); + ReadWriteIOUtils.write(value, stream); + } }
