This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4b79487e049 Support insertion in logical view (#9937)
4b79487e049 is described below
commit 4b79487e049dca28bfe19c3a010c856c6106d205
Author: 橘子 <[email protected]>
AuthorDate: Sat May 27 10:07:39 2023 +0800
Support insertion in logical view (#9937)
---
.../commons/schema/view/LogicalViewSchema.java | 30 +++
.../db/metadata/cache/DataNodeSchemaCache.java | 22 ++-
.../cache/DeviceUsingTemplateSchemaCache.java | 7 +
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 9 +
.../db/metadata/cache/TimeSeriesSchemaCache.java | 70 ++++++-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 6 -
.../schemaregion/SchemaRegionMemoryImpl.java | 4 +-
.../mpp/common/schematree/ClusterSchemaTree.java | 43 +++++
.../common/schematree/IMeasurementSchemaInfo.java | 3 +
.../common/schematree/MeasurementSchemaInfo.java | 9 +
.../schematree/node/SchemaMeasurementNode.java | 8 +
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 99 ++++++----
.../analyze/schema/ClusterSchemaFetchExecutor.java | 19 ++
.../plan/analyze/schema/ISchemaComputation.java | 39 ++++
.../mpp/plan/analyze/schema/ISchemaValidation.java | 9 +
.../plan/analyze/schema/NormalSchemaFetcher.java | 206 ++++++++++++++++++---
.../plan/statement/crud/InsertBaseStatement.java | 93 ++++++++++
.../crud/InsertMultiTabletsStatement.java | 17 ++
.../plan/statement/crud/InsertRowStatement.java | 128 ++++++++++++-
.../crud/InsertRowsOfOneDeviceStatement.java | 21 +++
.../plan/statement/crud/InsertRowsStatement.java | 17 ++
.../plan/statement/crud/InsertTabletStatement.java | 135 +++++++++++++-
22 files changed, 919 insertions(+), 75 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
b/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
index 4ef891eeb09..03da5507d75 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.commons.schema.view;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import
org.apache.iotdb.commons.schema.view.viewExpression.leaf.TimeSeriesViewOperand;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -33,6 +36,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.rmi.UnexpectedException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -221,4 +225,30 @@ public class LogicalViewSchema
public void setExpression(ViewExpression expression) {
this.expression = expression;
}
+
+ public boolean isWritable() {
+ return this.expression instanceof TimeSeriesViewOperand;
+ }
+
+ public String getSourcePathStringIfWritable() {
+ if (this.isWritable()) {
+ return ((TimeSeriesViewOperand) this.expression).getPathString();
+ }
+ return null;
+ }
+
+ public PartialPath getSourcePathIfWritable() {
+ if (this.isWritable()) {
+ try {
+ return new PartialPath(((TimeSeriesViewOperand)
this.expression).getPathString());
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(
+ new UnexpectedException(
+ String.format(
+ "Logical view with measurementID [%s] is broken. It stores
illegal path [%s].",
+ this.measurementId,
this.getSourcePathStringIfWritable())));
+ }
+ }
+ return null;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 01ab162a2df..eed3f023897 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -127,7 +128,26 @@ public class DataNodeSchemaCache {
}
public List<Integer> computeWithoutTemplate(ISchemaComputation
schemaComputation) {
- return timeSeriesSchemaCache.compute(schemaComputation);
+ List<Integer> result =
timeSeriesSchemaCache.computeAndRecordLogicalView(schemaComputation);
+ schemaComputation.recordRangeOfLogicalViewSchemaListNow();
+ return result;
+ }
+
+ /**
+ * This function is used to process logical view schema list in statement.
It will try to find the
+ * source paths of those views in cache. If it found sources, measurement
schemas of sources will
+ * be recorded in measurement schema list; else the views will be recorded
as missed. The indexes
+ * of missed views and full paths of their source paths will be returned.
+ *
+ * @param schemaComputation the statement you want to process
+ * @return The indexes of missed views and full paths of their source paths
will be returned.
+ */
+ public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
+ ISchemaComputation schemaComputation) {
+ if (!schemaComputation.hasLogicalViewNeedProcess()) {
+ return new Pair<>(new ArrayList<>(), new ArrayList<>());
+ }
+ return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
}
public List<Integer> computeWithTemplate(ISchemaComputation
schemaComputation) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
index 4a391033733..77291adb47e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DeviceUsingTemplateSchemaCache.java
@@ -181,6 +181,13 @@ public class DeviceUsingTemplateSchemaCache {
schema.getCompressor());
}
+ @Override
+ public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+ throw new RuntimeException(
+ new UnsupportedOperationException(
+ "Function getSchemaAsLogicalViewSchema is not supported
in DeviceUsingTemplateSchemaCache."));
+ }
+
@Override
public String getAlias() {
return null;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index efe44969ec6..9a377c23210 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.cache;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import
org.apache.iotdb.db.metadata.cache.lastCache.container.ILastCacheContainer;
import
org.apache.iotdb.db.metadata.cache.lastCache.container.LastCacheContainer;
import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
@@ -130,6 +131,14 @@ public class SchemaCacheEntry implements
IMeasurementSchemaInfo {
return null;
}
+ @Override
+ public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+ if (this.iMeasurementSchema instanceof LogicalViewSchema) {
+ return (LogicalViewSchema) this.getSchema();
+ }
+ return null;
+ }
+
@Override
public String getAlias() {
return null;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index bf54ba0b04c..09fda330f3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.cache;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCache;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.db.metadata.cache.dualkeycache.impl.DualKeyCachePolicy;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -128,9 +130,9 @@ public class TimeSeriesSchemaCache {
return schemaTree;
}
- public List<Integer> compute(ISchemaComputation schemaComputation) {
+ public List<Integer> computeAndRecordLogicalView(ISchemaComputation
schemaComputation) {
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
- final AtomicBoolean isFirstMeasurement = new AtomicBoolean(true);
+ final AtomicBoolean isFirstNonViewMeasurement = new AtomicBoolean(true);
dualKeyCache.compute(
new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
@Override
@@ -148,9 +150,9 @@ public class TimeSeriesSchemaCache {
if (value == null) {
indexOfMissingMeasurements.add(index);
} else {
- if (isFirstMeasurement.get()) {
+ if (isFirstNonViewMeasurement.get() && (!value.isLogicalView()))
{
schemaComputation.computeDevice(value.isAligned());
- isFirstMeasurement.getAndSet(false);
+ isFirstNonViewMeasurement.getAndSet(false);
}
schemaComputation.computeMeasurement(index, value);
}
@@ -159,6 +161,66 @@ public class TimeSeriesSchemaCache {
return indexOfMissingMeasurements;
}
+ public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
+ ISchemaComputation schemaComputation) {
+ List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+ List<String> missedPathStringList = new ArrayList<>();
+ final AtomicBoolean isFirstMeasurement = new AtomicBoolean(true);
+ Pair<Integer, Integer> beginToEnd =
schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+ List<LogicalViewSchema> logicalViewSchemaList =
schemaComputation.getLogicalViewSchemaList();
+ List<Integer> indexListOfLogicalViewPaths =
schemaComputation.getIndexListOfLogicalViewPaths();
+ for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+ LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+ final int realIndex = indexListOfLogicalViewPaths.get(i);
+ final int recordMissingIndex = i;
+ if (!logicalViewSchema.isWritable()) {
+ throw new RuntimeException(
+ new UnsupportedOperationException(
+ "You can not insert into a logical view which is not alias
series!"));
+ }
+ PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+ dualKeyCache.compute(
+ new IDualKeyCacheComputation<PartialPath, String,
SchemaCacheEntry>() {
+ @Override
+ public PartialPath getFirstKey() {
+ return fullPath.getDevicePath();
+ }
+
+ @Override
+ public String[] getSecondKeyList() {
+ return new String[] {fullPath.getMeasurement()};
+ }
+
+ @Override
+ public void computeValue(int index, SchemaCacheEntry value) {
+ index = realIndex;
+ if (value == null) {
+ indexOfMissingMeasurements.add(recordMissingIndex);
+ } else {
+ if (isFirstMeasurement.get()) {
+ schemaComputation.computeDevice(value.isAligned());
+ isFirstMeasurement.getAndSet(false);
+ }
+ if (value.isLogicalView()) {
+ // does not support views in views
+ throw new RuntimeException(
+ new UnsupportedOperationException(
+ String.format(
+ "The source of view [%s] is also a view! Nested
view is unsupported! "
+ + "Please check it.",
+ fullPath)));
+ }
+ schemaComputation.computeMeasurementOfView(index, value,
value.isAligned());
+ }
+ }
+ });
+ }
+ for (int index : indexOfMissingMeasurements) {
+
missedPathStringList.add(logicalViewSchemaList.get(index).getSourcePathStringIfWritable());
+ }
+ return new Pair<>(indexOfMissingMeasurements, missedPathStringList);
+ }
+
public void put(ClusterSchemaTree schemaTree) {
for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath),
measurementPath);
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 6eeadd9d61c..508825aca0d 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -1055,12 +1055,6 @@ public class MTreeBelowSGMemoryImpl {
}
}
- if (device.isDevice() && device.getAsDeviceMNode().isAligned()) {
- throw new AlignedTimeseriesException(
- "timeseries under this entity is aligned, can not create view
under this entity.",
- device.getFullPath());
- }
-
IDeviceMNode<IMemMNode> entityMNode;
if (device.isDevice()) {
entityMNode = device.getAsDeviceMNode();
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 97297d962dc..19c34638bb7 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -810,8 +810,8 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
}
// create one logical view
- IMeasurementMNode<IMemMNode> leafMNode;
- leafMNode = mtree.createLogicalView(path,
viewPathToSourceMap.get(path));
+ IMeasurementMNode<IMemMNode> leafMNode =
+ mtree.createLogicalView(path, viewPathToSourceMap.get(path));
} catch (Throwable t) {
if (seriesNumberMonitor != null) {
seriesNumberMonitor.deleteTimeSeries(1);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 5fbb080c400..2c0f40db89f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.mpp.common.schematree;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
@@ -38,6 +40,7 @@ import
org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.rmi.UnexpectedException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -189,6 +192,46 @@ public class ClusterSchemaTree implements ISchemaTree {
return indexOfMissingMeasurements;
}
+ /**
+ * This function compute logical view and fill source of these views. It
returns nothing ! If some
+ * source paths are missed, throw errors.
+ *
+ * @param schemaComputation the statement
+ * @param indexOfTargetLogicalView the index list of logicalViewSchemaList
that you want to check
+ */
+ public void computeSourceOfLogicalView(
+ ISchemaComputation schemaComputation, List<Integer>
indexOfTargetLogicalView) {
+ if (!schemaComputation.hasLogicalViewNeedProcess()) {
+ return;
+ }
+ List<LogicalViewSchema> logicalViewSchemaList =
schemaComputation.getLogicalViewSchemaList();
+ for (Integer index : indexOfTargetLogicalView) {
+ LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(index);
+ PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+ Pair<List<MeasurementPath>, Integer> searchResult =
this.searchMeasurementPaths(fullPath);
+ List<MeasurementPath> measurementPathList = searchResult.left;
+ if (measurementPathList.size() <= 0) {
+ throw new RuntimeException(
+ new PathNotExistException(
+ String.format(
+ "The source path of view [%s] does not exist.",
fullPath.getFullPath())));
+ } else if (measurementPathList.size() > 1) {
+ throw new RuntimeException(
+ new UnexpectedException(
+ String.format(
+ "The source paths of view [%s] are multiple.",
fullPath.getFullPath())));
+ } else {
+ Integer realIndex =
schemaComputation.getIndexListOfLogicalViewPaths().get(index);
+ MeasurementPath measurementPath = measurementPathList.get(0);
+ schemaComputation.computeMeasurementOfView(
+ realIndex,
+ new MeasurementSchemaInfo(
+ measurementPath.getMeasurement(),
measurementPath.getMeasurementSchema(), null),
+ measurementPath.isUnderAlignedEntity());
+ }
+ }
+ }
+
public void appendMeasurementPaths(List<MeasurementPath>
measurementPathList) {
for (MeasurementPath measurementPath : measurementPathList) {
appendSingleMeasurementPath(measurementPath);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/IMeasurementSchemaInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/IMeasurementSchemaInfo.java
index 5fe2684449d..b6b5728ebf6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/IMeasurementSchemaInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/IMeasurementSchemaInfo.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.common.schematree;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -31,6 +32,8 @@ public interface IMeasurementSchemaInfo {
/** @return if the IMeasurementSchema is MeasurementSchema, return itself;
else return null. */
MeasurementSchema getSchemaAsMeasurementSchema();
+ LogicalViewSchema getSchemaAsLogicalViewSchema();
+
String getAlias();
boolean isLogicalView();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
index 908a7ff51a2..c9cad7b8a86 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/MeasurementSchemaInfo.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.common.schematree;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -56,6 +57,14 @@ public class MeasurementSchemaInfo implements
IMeasurementSchemaInfo {
}
}
+ @Override
+ public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+ if (this.isLogicalView()) {
+ return (LogicalViewSchema) this.schema;
+ }
+ return null;
+ }
+
public String getAlias() {
return alias;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
index 60011b1345e..5b44973dcdc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
@@ -68,6 +68,14 @@ public class SchemaMeasurementNode extends SchemaNode
implements IMeasurementSch
return null;
}
+ @Override
+ public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+ if (this.schema instanceof LogicalViewSchema) {
+ return (LogicalViewSchema) this.getSchema();
+ }
+ return null;
+ }
+
public Map<String, String> getTagMap() {
return tagMap;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 6e32b1d658d..a54e4157a28 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -2083,48 +2083,53 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
- analysis.setStatement(insertTabletStatement);
validateSchema(analysis, insertTabletStatement);
+ InsertBaseStatement realStatement =
insertTabletStatement.removeLogicalView();
+ analysis.setStatement(realStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
-
dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
-
dataPartitionQueryParam.setTimePartitionSlotList(insertTabletStatement.getTimePartitionSlots());
+ if (realStatement instanceof InsertTabletStatement) {
+ InsertTabletStatement realInsertTabletStatement =
(InsertTabletStatement) realStatement;
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(
+ realInsertTabletStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ realInsertTabletStatement.getTimePartitionSlots());
- return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
+ return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
+ } else {
+ return computeAnalysisForMultiTablets(analysis,
(InsertMultiTabletsStatement) realStatement);
+ }
}
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
- analysis.setStatement(insertRowStatement);
validateSchema(analysis, insertRowStatement);
+ InsertBaseStatement realInsertStatement =
insertRowStatement.removeLogicalView();
+ analysis.setStatement(realInsertStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
-
dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- Collections.singletonList(insertRowStatement.getTimePartitionSlot()));
-
- return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
- }
+ if (realInsertStatement instanceof InsertRowStatement) {
+ InsertRowStatement realInsertRowStatement = (InsertRowStatement)
realInsertStatement;
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+
dataPartitionQueryParam.setDevicePath(realInsertRowStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+
Collections.singletonList(realInsertRowStatement.getTimePartitionSlot()));
- @Override
- public Analysis visitInsertRows(
- InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- Analysis analysis = new Analysis();
- analysis.setStatement(insertRowsStatement);
- validateSchema(analysis, insertRowsStatement);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
+ return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
+ } else {
+ return computeAnalysisForInsertRows(analysis, (InsertRowsStatement)
realInsertStatement);
}
+ }
+ private Analysis computeAnalysisForInsertRows(
+ Analysis analysis, InsertRowsStatement insertRowsStatement) {
Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
@@ -2145,16 +2150,23 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
@Override
- public Analysis visitInsertMultiTablets(
- InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext
context) {
+ public Analysis visitInsertRows(
+ InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
- analysis.setStatement(insertMultiTabletsStatement);
- validateSchema(analysis, insertMultiTabletsStatement);
+ validateSchema(analysis, insertRowsStatement);
+ InsertRowsStatement realInsertRowsStatement =
+ (InsertRowsStatement) insertRowsStatement.removeLogicalView();
+ analysis.setStatement(realInsertRowsStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
+ return computeAnalysisForInsertRows(analysis, realInsertRowsStatement);
+ }
+
+ private Analysis computeAnalysisForMultiTablets(
+ Analysis analysis, InsertMultiTabletsStatement
insertMultiTabletsStatement) {
Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
@@ -2175,24 +2187,45 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return getAnalysisForWriting(analysis, dataPartitionQueryParams);
}
+ @Override
+ public Analysis visitInsertMultiTablets(
+ InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext
context) {
+ context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ validateSchema(analysis, insertMultiTabletsStatement);
+ InsertMultiTabletsStatement realStatement =
+ (InsertMultiTabletsStatement)
insertMultiTabletsStatement.removeLogicalView();
+ analysis.setStatement(realStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
+
+ return computeAnalysisForMultiTablets(analysis, realStatement);
+ }
+
@Override
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
- analysis.setStatement(insertRowsOfOneDeviceStatement);
validateSchema(analysis, insertRowsOfOneDeviceStatement);
+ InsertBaseStatement realInsertStatement =
insertRowsOfOneDeviceStatement.removeLogicalView();
+ analysis.setStatement(realInsertStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(
- insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertRowsOfOneDeviceStatement.getTimePartitionSlots());
+ if (realInsertStatement instanceof InsertRowsOfOneDeviceStatement) {
+ InsertRowsOfOneDeviceStatement realStatement =
+ (InsertRowsOfOneDeviceStatement) realInsertStatement;
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+
dataPartitionQueryParam.setDevicePath(realStatement.getDevicePath().getFullPath());
+
dataPartitionQueryParam.setTimePartitionSlotList(realStatement.getTimePartitionSlots());
- return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
+ return getAnalysisForWriting(analysis,
Collections.singletonList(dataPartitionQueryParam));
+ } else {
+ return computeAnalysisForInsertRows(analysis, (InsertRowsStatement)
realInsertStatement);
+ }
}
private void validateSchema(Analysis analysis, InsertBaseStatement
insertStatement) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 38d69b7ab42..be366d9cd2e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.analyze.schema;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -130,6 +131,24 @@ class ClusterSchemaFetchExecutor {
return fetchSchemaAndCacheResult(patternTree);
}
+ ClusterSchemaTree fetchSchemaWithFullPaths(List<String> fullPathList) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String fullPath : fullPathList) {
+ try {
+ patternTree.appendFullPath(new PartialPath(fullPath));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
+ ClusterSchemaTree fetchSchemaWithPatternTreeAndCache(PathPatternTree
patternTree) {
+ patternTree.constructTree();
+ return fetchSchemaAndCacheResult(patternTree);
+ }
+
private ClusterSchemaTree fetchSchemaAndCacheResult(PathPatternTree
patternTree) {
ClusterSchemaTree schemaTree =
executeSchemaFetchQuery(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
index 8fdc9a9fccc..69376da2602 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
@@ -20,7 +20,11 @@
package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
/**
* This interface defines the required behaviour invoked during schema
fetch/computation, which is
@@ -40,4 +44,39 @@ public interface ISchemaComputation {
* @param measurementSchemaInfo the measurement schema of fetched measurement
*/
void computeMeasurement(int index, IMeasurementSchemaInfo
measurementSchemaInfo);
+
+ // region used by logical view
+ boolean hasLogicalViewNeedProcess();
+
+ /**
+ * @return the logical view schema list recorded by this statement. It may
be NULL if it is not
+ * used before.
+ */
+ List<LogicalViewSchema> getLogicalViewSchemaList();
+
+ /**
+ * @return the index list of logical view paths, where source of views
should be placed. For
+ * example, IndexListOfLogicalViewPaths[alpha] = beta, then you should
use
+ * LogicalViewSchemaList[alpha] to fill measurementSchema[beta].
+ */
+ List<Integer> getIndexListOfLogicalViewPaths();
+
+ /**
+ * Record the beginning and ending of logical schema list. After calling
this interface, the range
+ * should be record. For example, the range is [0,4) which means 4 schemas
exist. Later, more 3
+ * schemas are added, this function is called, then it records [4,7).
+ */
+ void recordRangeOfLogicalViewSchemaListNow();
+
+ /** @return the recorded range of logical view schema list. */
+ Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded();
+
+ /**
+ * @param index the index of fetched measurement in array returned by
getMeasurements
+ * @param measurementSchemaInfo the measurement schema of source of the
logical view
+ * @param isAligned whether the source of this view is aligned.
+ */
+ void computeMeasurementOfView(
+ int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean
isAligned);
+ // endregion
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
index b0141f89b64..1d0252e92f6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
@@ -34,7 +34,16 @@ public interface ISchemaValidation extends
ISchemaComputationWithAutoCreation {
validateMeasurementSchema(index, measurementSchemaInfo);
}
+ @Override
+ default void computeMeasurementOfView(
+ int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean
isAligned) {
+ validateMeasurementSchema(index, measurementSchemaInfo, isAligned);
+ }
+
void validateDeviceSchema(boolean isAligned);
void validateMeasurementSchema(int index, IMeasurementSchemaInfo
measurementSchemaInfo);
+
+ void validateMeasurementSchema(
+ int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean
isAligned);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
index f8f0d37f1a3..09da201f259 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.db.mpp.plan.analyze.schema;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
import java.util.List;
@@ -47,31 +51,114 @@ class NormalSchemaFetcher {
this.clusterSchemaFetchExecutor = clusterSchemaFetchExecutor;
}
+ /** Given full paths will be */
+ private void computePatternTreeNeededReFetch(
+ PathPatternTree patternTree, List<String> fullPaths) {
+ for (String fullPath : fullPaths) {
+ try {
+ patternTree.appendFullPath(new PartialPath(fullPath));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private PathPatternTree computePatternTreeNeededReFetch(
+ PartialPath devicePath,
+ String[] measurementList,
+ List<Integer> indexOfMissingMeasurements,
+ List<String> fullPaths) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (int index : indexOfMissingMeasurements) {
+ patternTree.appendFullPath(devicePath, measurementList[index]);
+ }
+ for (String fullPath : fullPaths) {
+ try {
+ patternTree.appendFullPath(new PartialPath(fullPath));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return patternTree;
+ }
+
+ private PathPatternTree computePatternTreeNeededReFetch(
+ List<PartialPath> devicePathList,
+ List<String[]> measurementsList,
+ List<Integer> indexOfTargetDevices,
+ List<List<Integer>> indexOfTargetMeasurementsList) {
+ PathPatternTree patternTree = new PathPatternTree();
+ int deviceIndex;
+ for (int i = 0, size = indexOfTargetDevices.size(); i < size; i++) {
+ deviceIndex = indexOfTargetDevices.get(i);
+ for (int measurementIndex : indexOfTargetMeasurementsList.get(i)) {
+ patternTree.appendFullPath(
+ devicePathList.get(deviceIndex),
measurementsList.get(deviceIndex)[measurementIndex]);
+ }
+ }
+ return patternTree;
+ }
+
List<Integer> processNormalTimeSeries(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+ // [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfMissingMeasurements =
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
+ // [Step 2] Cache 2. process recorded logical views. If there is no views
now, it returns empty
+ // lists.
+ Pair<List<Integer>, List<String>> missedIndexAndPathString =
+
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreation);
+ List<Integer> indexOfMissingLogicalView = missedIndexAndPathString.left;
+ List<String> missedPathStringOfLogicalView =
missedIndexAndPathString.right;
// all schema can be taken from cache
- if (indexOfMissingMeasurements.isEmpty()) {
+ if (indexOfMissingMeasurements.isEmpty() &&
indexOfMissingLogicalView.isEmpty()) {
return indexOfMissingMeasurements;
}
-
- // try fetch the missing schema from remote and cache fetched schema
- ClusterSchemaTree remoteSchemaTree =
- clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
- schemaComputationWithAutoCreation.getDevicePath(),
- schemaComputationWithAutoCreation.getMeasurements(),
- indexOfMissingMeasurements);
+ // [Step 3] Fetch 1. fetch schema from remote. Process logical view first;
then process
+ // measurements.
+ // try fetch the missing raw schema from remote and cache fetched schema
+ ClusterSchemaTree remoteSchemaTree;
+ if (missedPathStringOfLogicalView.isEmpty()) {
+ remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ schemaComputationWithAutoCreation.getDevicePath(),
+ schemaComputationWithAutoCreation.getMeasurements(),
+ indexOfMissingMeasurements);
+ } else {
+ PathPatternTree patternTree =
+ computePatternTreeNeededReFetch(
+ schemaComputationWithAutoCreation.getDevicePath(),
+ schemaComputationWithAutoCreation.getMeasurements(),
+ indexOfMissingMeasurements,
+ missedPathStringOfLogicalView);
+ remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree);
+ }
+ // make sure all missed views are computed.
+ remoteSchemaTree.computeSourceOfLogicalView(
+ schemaComputationWithAutoCreation, indexOfMissingLogicalView);
// check and compute the fetched schema
indexOfMissingMeasurements =
remoteSchemaTree.compute(schemaComputationWithAutoCreation,
indexOfMissingMeasurements);
+ schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
+
+ // [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views.
Process them.
+ missedIndexAndPathString =
+
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreation);
+ indexOfMissingLogicalView = missedIndexAndPathString.left;
+ missedPathStringOfLogicalView = missedIndexAndPathString.right;
+ if (!missedPathStringOfLogicalView.isEmpty()) {
+ ClusterSchemaTree viewSchemaTree =
+
clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(missedPathStringOfLogicalView);
+ viewSchemaTree.computeSourceOfLogicalView(
+ schemaComputationWithAutoCreation, indexOfMissingLogicalView);
+ }
// all schema has been taken and processed
if (indexOfMissingMeasurements.isEmpty()) {
return indexOfMissingMeasurements;
}
- // auto create and process the missing schema
+ // [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
@@ -90,38 +177,77 @@ class NormalSchemaFetcher {
void processNormalTimeSeries(
List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList) {
+
+ // [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
List<List<Integer>> indexOfMissingMeasurementsList =
new ArrayList<>(schemaComputationWithAutoCreationList.size());
-
- ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
List<Integer> indexOfMissingMeasurements;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i <
size; i++) {
- schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(i);
indexOfMissingMeasurements =
-
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
+
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreationList.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(i);
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
}
}
-
+ // [Step 2] Cache 2. process recorded logical views.
+ boolean hasUnFetchedLogicalView = false;
+ List<Pair<List<Integer>, List<String>>> missedIndexAndPathStringOfViewList
=
+ new ArrayList<>(schemaComputationWithAutoCreationList.size());
+ for (ISchemaComputationWithAutoCreation iSchemaComputationWithAutoCreation
:
+ schemaComputationWithAutoCreationList) {
+ Pair<List<Integer>, List<String>> missedIndexAndPathString =
+
schemaCache.computeSourceOfLogicalView(iSchemaComputationWithAutoCreation);
+ if (!missedIndexAndPathString.left.isEmpty()) {
+ hasUnFetchedLogicalView = true;
+ }
+ missedIndexAndPathStringOfViewList.add(missedIndexAndPathString);
+ }
// all schema can be taken from cache
- if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
+ if (indexOfDevicesWithMissingMeasurements.isEmpty() &&
(!hasUnFetchedLogicalView)) {
return;
}
-
+ // [Step 3] Fetch 1.fetch schema from remote. Process logical view first;
then process
+ // measurements.
// try fetch the missing schema from remote
- ClusterSchemaTree remoteSchemaTree =
- clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
- schemaComputationWithAutoCreationList.stream()
- .map(ISchemaComputationWithAutoCreation::getDevicePath)
- .collect(Collectors.toList()),
- schemaComputationWithAutoCreationList.stream()
- .map(ISchemaComputationWithAutoCreation::getMeasurements)
- .collect(Collectors.toList()),
- indexOfDevicesWithMissingMeasurements,
- indexOfMissingMeasurementsList);
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
+ ClusterSchemaTree remoteSchemaTree;
+ if (!hasUnFetchedLogicalView) {
+ remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getMeasurements)
+ .collect(Collectors.toList()),
+ indexOfDevicesWithMissingMeasurements,
+ indexOfMissingMeasurementsList);
+ } else {
+ PathPatternTree patternTree =
+ computePatternTreeNeededReFetch(
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList()),
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getMeasurements)
+ .collect(Collectors.toList()),
+ indexOfDevicesWithMissingMeasurements,
+ indexOfMissingMeasurementsList);
+ List<String> fullPathsNeedReFetch = new ArrayList<>();
+ for (Pair<List<Integer>, List<String>> pair :
missedIndexAndPathStringOfViewList) {
+ fullPathsNeedReFetch.addAll(pair.right);
+ }
+ computePatternTreeNeededReFetch(patternTree, fullPathsNeedReFetch);
+ remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree);
+ }
+ // make sure all missed views are computed.
+ for (int i = 0; i < schemaComputationWithAutoCreationList.size(); i++) {
+ schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(i);
+ remoteSchemaTree.computeSourceOfLogicalView(
+ schemaComputationWithAutoCreation,
missedIndexAndPathStringOfViewList.get(i).left);
+ }
// check and compute the fetched schema
List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
@@ -131,18 +257,44 @@ class NormalSchemaFetcher {
indexOfMissingMeasurements =
remoteSchemaTree.compute(
schemaComputationWithAutoCreation,
indexOfMissingMeasurementsList.get(i));
+
schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
}
}
+ // [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views.
Process them.
+ hasUnFetchedLogicalView = false;
+ for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i <
size; i++) {
+ Pair<List<Integer>, List<String>> missedIndexAndPathString =
+
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreationList.get(i));
+ if (missedIndexAndPathString.left.size() > 0) {
+ hasUnFetchedLogicalView = true;
+ }
+ missedIndexAndPathStringOfViewList.get(i).left =
missedIndexAndPathString.left;
+ missedIndexAndPathStringOfViewList.get(i).right =
missedIndexAndPathString.right;
+ }
+ if (hasUnFetchedLogicalView) {
+ List<String> fullPathsNeedRefetch = new ArrayList<>();
+ for (Pair<List<Integer>, List<String>> pair :
missedIndexAndPathStringOfViewList) {
+ fullPathsNeedRefetch.addAll(pair.right);
+ }
+ ClusterSchemaTree viewSchemaTree =
+
clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(fullPathsNeedRefetch);
+ for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i <
size; i++) {
+ schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(i);
+ viewSchemaTree.computeSourceOfLogicalView(
+ schemaComputationWithAutoCreation,
missedIndexAndPathStringOfViewList.get(i).left);
+ }
+ }
+
// all schema has been taken and processed
if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
return;
}
- // auto create and process the missing schema
+ // [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index d6c9941c580..708b2be09e3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -26,9 +27,13 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -52,6 +57,27 @@ public abstract class InsertBaseStatement extends Statement {
/** index of failed measurements -> info including measurement, data type
and value */
protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+ // region params used by analyzing logical views.
+
+ /** This param records the logical view schema appeared in this statement. */
+ List<LogicalViewSchema> logicalViewSchemaList;
+
+ /**
+ * This param records the index of the location where the source of this
view should be placed.
+ *
+ * <p>For example, indexListOfLogicalViewPaths[alpha] = beta means source of
+ * logicalViewSchemaList[alpha] should be filled into
measurementSchemas[beta].
+ */
+ List<Integer> indexOfSourcePathsOfLogicalViews;
+
+ /** it is the end of last range, the beginning of current range. */
+ int recordedBeginOfLogicalViewSchemaList = 0;
+
+ /** it is the end of current range. */
+ int recordedEndOfLogicalViewSchemaList = 0;
+
+ // endregion
+
public PartialPath getDevicePath() {
return devicePath;
}
@@ -227,4 +253,71 @@ public abstract class InsertBaseStatement extends
Statement {
}
}
// endregion
+
+ // region functions used by analyzing logical views
+ /**
+ * Remove logical view in this statement according to validated schemas. So
this function should
+ * be called after validating schemas.
+ */
+ public abstract InsertBaseStatement removeLogicalView();
+
+ public void setFailedMeasurementIndex2Info(
+ Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info) {
+ this.failedMeasurementIndex2Info = failedMeasurementIndex2Info;
+ }
+
+ /**
+ * This function is used in splitting. Traverse two lists:
logicalViewSchemaList, measurements,
+ * then find out all devices in this statement. Those devices will map to
their measurements,
+ * recorded in a pair of measurement name and the index of measurement
schemas
+ * (this.measurementSchemas).
+ *
+ * @return map from device path to its measurements.
+ */
+ protected final Map<PartialPath, List<Pair<String, Integer>>>
+ getMapFromDeviceToMeasurementAndIndex() {
+ boolean[] isLogicalView = new boolean[this.measurements.length];
+ int[] indexMapToLogicalViewList = new int[this.measurements.length];
+ Arrays.fill(isLogicalView, false);
+ if (this.indexOfSourcePathsOfLogicalViews != null) {
+ for (int i = 0; i < this.indexOfSourcePathsOfLogicalViews.size(); i++) {
+ int realIndex = this.indexOfSourcePathsOfLogicalViews.get(i);
+ isLogicalView[realIndex] = true;
+ indexMapToLogicalViewList[realIndex] = i;
+ }
+ }
+ // construct map from device to measurements and record the index of its
measurement schema
+ Map<PartialPath, List<Pair<String, Integer>>>
mapFromDeviceToMeasurementAndIndex =
+ new HashMap<>();
+ for (int i = 0; i < this.measurements.length; i++) {
+ PartialPath devicePath;
+ String measurementName;
+ if (isLogicalView[i]) {
+ int viewIndex = indexMapToLogicalViewList[i];
+ devicePath =
+
this.logicalViewSchemaList.get(viewIndex).getSourcePathIfWritable().getDevicePath();
+ measurementName =
+
this.logicalViewSchemaList.get(viewIndex).getSourcePathIfWritable().getMeasurement();
+ } else {
+ devicePath = this.devicePath;
+ measurementName = this.measurements[i];
+ }
+ int index = i;
+ final String finalMeasurementName = measurementName;
+ mapFromDeviceToMeasurementAndIndex.compute(
+ devicePath,
+ (k, v) -> {
+ if (v == null) {
+ List<Pair<String, Integer>> valueList = new ArrayList<>();
+ valueList.add(new Pair<>(finalMeasurementName, index));
+ return valueList;
+ } else {
+ v.add(new Pair<>(finalMeasurementName, index));
+ return v;
+ }
+ });
+ }
+ return mapFromDeviceToMeasurementAndIndex;
+ }
+ // endregion
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index 2ee89dc69c4..a43bfc7afef 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -125,4 +125,21 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
public Object getFirstValueOfIndex(int index) {
throw new NotImplementedException();
}
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ List<InsertTabletStatement> mergedList = new ArrayList<>();
+ boolean needSplit = false;
+ for (InsertTabletStatement child : this.insertTabletStatementList) {
+ List<InsertTabletStatement> childSplitResult = child.getSplitList();
+ needSplit = needSplit || child.isNeedSplit();
+ mergedList.addAll(childSplitResult);
+ }
+ if (!needSplit) {
+ return this;
+ }
+ InsertMultiTabletsStatement splitResult = new
InsertMultiTabletsStatement();
+ splitResult.setInsertTabletStatementList(mergedList);
+ return splitResult;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 094cfe07398..dbd2e23dcec 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -38,6 +39,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -46,8 +48,11 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class InsertRowStatement extends InsertBaseStatement implements
ISchemaValidation {
@@ -60,9 +65,17 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
private Object[] values;
private boolean isNeedInferType = false;
+ /**
+ * This param record whether the source of logical view is aligned. Only
used when there are
+ * views.
+ */
+ private boolean[] measurementIsAligned;
+
public InsertRowStatement() {
super();
statementType = StatementType.INSERT;
+ this.recordedBeginOfLogicalViewSchemaList = 0;
+ this.recordedEndOfLogicalViewSchemaList = 0;
}
@Override
@@ -243,6 +256,66 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
values[index] = null;
}
+ public boolean isNeedSplit() {
+ if (this.indexOfSourcePathsOfLogicalViews == null) {
+ return false;
+ }
+ return !this.indexOfSourcePathsOfLogicalViews.isEmpty();
+ }
+
+ public List<InsertRowStatement> getSplitList() {
+ if (!isNeedSplit()) {
+ return Collections.singletonList(this);
+ }
+ Map<PartialPath, List<Pair<String, Integer>>>
mapFromDeviceToMeasurementAndIndex =
+ this.getMapFromDeviceToMeasurementAndIndex();
+ // Reconstruct statements
+ List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry :
+ mapFromDeviceToMeasurementAndIndex.entrySet()) {
+ List<Pair<String, Integer>> pairList = entry.getValue();
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setTime(this.time);
+ statement.setNeedInferType(this.isNeedInferType);
+ statement.setDevicePath(entry.getKey());
+ Object[] values = new Object[pairList.size()];
+ String[] measurements = new String[pairList.size()];
+ MeasurementSchema[] measurementSchemas = new
MeasurementSchema[pairList.size()];
+ TSDataType[] dataTypes = new TSDataType[pairList.size()];
+ for (int i = 0; i < pairList.size(); i++) {
+ int realIndex = pairList.get(i).right;
+ values[i] = this.values[realIndex];
+ measurements[i] = pairList.get(i).left;
+ measurementSchemas[i] = this.measurementSchemas[realIndex];
+ dataTypes[i] = this.dataTypes[realIndex];
+ if (this.measurementIsAligned != null) {
+ statement.setAligned(this.measurementIsAligned[realIndex]);
+ }
+ }
+ statement.setValues(values);
+ statement.setMeasurements(measurements);
+ statement.setMeasurementSchemas(measurementSchemas);
+ statement.setDataTypes(dataTypes);
+ statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+ insertRowStatementList.add(statement);
+ }
+ return insertRowStatementList;
+ }
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ if (!isNeedSplit()) {
+ return this;
+ }
+ List<InsertRowStatement> insertRowStatementList = this.getSplitList();
+ if (insertRowStatementList.size() == 1) {
+ return insertRowStatementList.get(0);
+ }
+ InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+ insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+ return insertRowsStatement;
+ }
+
@Override
public ISchemaValidation getSchemaValidation() {
return this;
@@ -299,7 +372,17 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
if (measurementSchemaInfo == null) {
measurementSchemas[index] = null;
} else {
- measurementSchemas[index] =
measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ if (measurementSchemaInfo.isLogicalView()) {
+ if (logicalViewSchemaList == null || indexOfSourcePathsOfLogicalViews
== null) {
+ logicalViewSchemaList = new ArrayList<>();
+ indexOfSourcePathsOfLogicalViews = new ArrayList<>();
+ }
+
logicalViewSchemaList.add(measurementSchemaInfo.getSchemaAsLogicalViewSchema());
+ indexOfSourcePathsOfLogicalViews.add(index);
+ return;
+ } else {
+ measurementSchemas[index] =
measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ }
}
if (isNeedInferType) {
return;
@@ -311,4 +394,47 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
throw new SemanticException(e);
}
}
+
+ @Override
+ public void validateMeasurementSchema(
+ int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean
isAligned) {
+ this.validateMeasurementSchema(index, measurementSchemaInfo);
+ if (this.measurementIsAligned == null) {
+ this.measurementIsAligned = new boolean[this.measurements.length];
+ Arrays.fill(this.measurementIsAligned, this.isAligned);
+ }
+ this.measurementIsAligned[index] = isAligned;
+ }
+
+ @Override
+ public boolean hasLogicalViewNeedProcess() {
+ if (this.indexOfSourcePathsOfLogicalViews == null) {
+ return false;
+ }
+ return !this.indexOfSourcePathsOfLogicalViews.isEmpty();
+ }
+
+ @Override
+ public List<LogicalViewSchema> getLogicalViewSchemaList() {
+ return this.logicalViewSchemaList;
+ }
+
+ @Override
+ public List<Integer> getIndexListOfLogicalViewPaths() {
+ return this.indexOfSourcePathsOfLogicalViews;
+ }
+
+ @Override
+ public void recordRangeOfLogicalViewSchemaListNow() {
+ if (this.logicalViewSchemaList != null) {
+ this.recordedBeginOfLogicalViewSchemaList =
this.recordedEndOfLogicalViewSchemaList;
+ this.recordedEndOfLogicalViewSchemaList =
this.logicalViewSchemaList.size();
+ }
+ }
+
+ @Override
+ public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() {
+ return new Pair<>(
+ this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 65c1f106a90..a65c68eb331 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -136,4 +136,25 @@ public class InsertRowsOfOneDeviceStatement extends
InsertBaseStatement {
public Object getFirstValueOfIndex(int index) {
throw new NotImplementedException();
}
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ boolean needSplit = false;
+ for (InsertRowStatement child : this.insertRowStatementList) {
+ if (child.isNeedSplit()) {
+ needSplit = true;
+ }
+ }
+ if (needSplit) {
+ List<InsertRowStatement> mergedList = new ArrayList<>();
+ for (InsertRowStatement child : this.insertRowStatementList) {
+ List<InsertRowStatement> childSplitResult = child.getSplitList();
+ mergedList.addAll(childSplitResult);
+ }
+ InsertRowsStatement splitResult = new InsertRowsStatement();
+ splitResult.setInsertRowStatementList(mergedList);
+ return splitResult;
+ }
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index 0f0f973b947..8528c64c307 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -136,4 +136,21 @@ public class InsertRowsStatement extends
InsertBaseStatement {
public Object getFirstValueOfIndex(int index) {
throw new NotImplementedException();
}
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ List<InsertRowStatement> mergedList = new ArrayList<>();
+ boolean needSplit = false;
+ for (InsertRowStatement child : this.insertRowStatementList) {
+ List<InsertRowStatement> childSplitResult = child.getSplitList();
+ needSplit = needSplit || child.isNeedSplit();
+ mergedList.addAll(childSplitResult);
+ }
+ if (!needSplit) {
+ return this;
+ }
+ InsertRowsStatement splitResult = new InsertRowsStatement();
+ splitResult.setInsertRowStatementList(mergedList);
+ return splitResult;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index cfa864fbbd0..d515b5168ca 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -36,14 +37,18 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
@@ -57,9 +62,17 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
private int rowCount = 0;
+ /**
+ * This param record whether the source of logical view is aligned. Only
used when there are
+ * views.
+ */
+ private boolean[] measurementIsAligned;
+
public InsertTabletStatement() {
super();
statementType = StatementType.BATCH_INSERT;
+ this.recordedBeginOfLogicalViewSchemaList = 0;
+ this.recordedEndOfLogicalViewSchemaList = 0;
}
public int getRowCount() {
@@ -186,6 +199,73 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
columns[index] = null;
}
+ public boolean isNeedSplit() {
+ if (this.indexOfSourcePathsOfLogicalViews == null) {
+ return false;
+ }
+ return !this.indexOfSourcePathsOfLogicalViews.isEmpty();
+ }
+
+ public List<InsertTabletStatement> getSplitList() {
+ if (!isNeedSplit()) {
+ return Collections.singletonList(this);
+ }
+ Map<PartialPath, List<Pair<String, Integer>>>
mapFromDeviceToMeasurementAndIndex =
+ this.getMapFromDeviceToMeasurementAndIndex();
+ // Reconstruct statements
+ List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+ for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry :
+ mapFromDeviceToMeasurementAndIndex.entrySet()) {
+ List<Pair<String, Integer>> pairList = entry.getValue();
+ InsertTabletStatement statement = new InsertTabletStatement();
+ statement.setTimes(this.times);
+ statement.setDevicePath(entry.getKey());
+ statement.setRowCount(this.rowCount);
+ Object[] columns = new Object[pairList.size()];
+ String[] measurements = new String[pairList.size()];
+ BitMap[] bitMaps = new BitMap[pairList.size()];
+ MeasurementSchema[] measurementSchemas = new
MeasurementSchema[pairList.size()];
+ TSDataType[] dataTypes = new TSDataType[pairList.size()];
+ for (int i = 0; i < pairList.size(); i++) {
+ int realIndex = pairList.get(i).right;
+ columns[i] = this.columns[realIndex];
+ measurements[i] = pairList.get(i).left;
+ measurementSchemas[i] = this.measurementSchemas[realIndex];
+ dataTypes[i] = this.dataTypes[realIndex];
+ if (this.bitMaps != null) {
+ bitMaps[i] = this.bitMaps[realIndex];
+ }
+ if (this.measurementIsAligned != null) {
+ statement.setAligned(this.measurementIsAligned[realIndex]);
+ }
+ }
+ statement.setColumns(columns);
+ statement.setMeasurements(measurements);
+ statement.setMeasurementSchemas(measurementSchemas);
+ statement.setDataTypes(dataTypes);
+ if (this.bitMaps != null) {
+ statement.setBitMaps(bitMaps);
+ }
+ statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+ insertTabletStatementList.add(statement);
+ }
+ return insertTabletStatementList;
+ }
+
+ @Override
+ public InsertBaseStatement removeLogicalView() {
+ if (!isNeedSplit()) {
+ return this;
+ }
+ List<InsertTabletStatement> insertTabletStatementList =
this.getSplitList();
+ if (insertTabletStatementList.size() == 1) {
+ return insertTabletStatementList.get(0);
+ }
+ InsertMultiTabletsStatement insertMultiTabletsStatement = new
InsertMultiTabletsStatement();
+
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ return insertMultiTabletsStatement;
+ }
+
@Override
public long getMinTime() {
return times[0];
@@ -261,7 +341,17 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
if (measurementSchemaInfo == null) {
measurementSchemas[index] = null;
} else {
- measurementSchemas[index] =
measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ if (measurementSchemaInfo.isLogicalView()) {
+ if (logicalViewSchemaList == null || indexOfSourcePathsOfLogicalViews
== null) {
+ logicalViewSchemaList = new ArrayList<>();
+ indexOfSourcePathsOfLogicalViews = new ArrayList<>();
+ }
+
logicalViewSchemaList.add(measurementSchemaInfo.getSchemaAsLogicalViewSchema());
+ indexOfSourcePathsOfLogicalViews.add(index);
+ return;
+ } else {
+ measurementSchemas[index] =
measurementSchemaInfo.getSchemaAsMeasurementSchema();
+ }
}
try {
@@ -270,4 +360,47 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
throw new SemanticException(e);
}
}
+
+ @Override
+ public void validateMeasurementSchema(
+ int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean
isAligned) {
+ this.validateMeasurementSchema(index, measurementSchemaInfo);
+ if (this.measurementIsAligned == null) {
+ this.measurementIsAligned = new boolean[this.measurements.length];
+ Arrays.fill(this.measurementIsAligned, this.isAligned);
+ }
+ this.measurementIsAligned[index] = isAligned;
+ }
+
+ @Override
+ public boolean hasLogicalViewNeedProcess() {
+ if (this.indexOfSourcePathsOfLogicalViews == null) {
+ return false;
+ }
+ return !this.indexOfSourcePathsOfLogicalViews.isEmpty();
+ }
+
+ @Override
+ public List<LogicalViewSchema> getLogicalViewSchemaList() {
+ return this.logicalViewSchemaList;
+ }
+
+ @Override
+ public List<Integer> getIndexListOfLogicalViewPaths() {
+ return this.indexOfSourcePathsOfLogicalViews;
+ }
+
+ @Override
+ public void recordRangeOfLogicalViewSchemaListNow() {
+ if (this.logicalViewSchemaList != null) {
+ this.recordedBeginOfLogicalViewSchemaList =
this.recordedEndOfLogicalViewSchemaList;
+ this.recordedEndOfLogicalViewSchemaList =
this.logicalViewSchemaList.size();
+ }
+ }
+
+ @Override
+ public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() {
+ return new Pair<>(
+ this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
+ }
}