This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fbddcfd62a2 Pipe Meta: Completed the logical view related logics &
some bug fixes in IT and dirs (#12159)
fbddcfd62a2 is described below
commit fbddcfd62a26b7af5ea116b6394d0bd630cac442
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 12 23:27:23 2024 +0800
Pipe Meta: Completed the logical view related logics & some bug fixes in IT
and dirs (#12159)
---
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 5 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 5 +-
.../iotdb/confignode/conf/ConfigNodeConstant.java | 13 -
.../PipeConfigNodeSnapshotResourceManager.java | 4 +-
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 11 +
.../iotdb/confignode/service/ConfigNode.java | 4 +-
.../pipe/receiver/PipePlanToStatementVisitor.java | 16 +-
.../receiver/PipeStatementTSStatusVisitor.java | 10 +
.../receiver/thrift/IoTDBDataNodeReceiver.java | 3 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 261 +++++++++++----------
.../metadata/view/CreateLogicalViewStatement.java | 12 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 9 +-
13 files changed, 189 insertions(+), 165 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index a3d0e0383a1..c94650817a9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -411,6 +411,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.pattern", null);
+ extractorAttributes.put("extractor.inclusion", "data");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
@@ -482,6 +483,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.pattern", "root.db1");
+ extractorAttributes.put("extractor.inclusion", "data");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
@@ -579,6 +581,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.history.enable", "false");
extractorAttributes.put("extractor.realtime.enable", "true");
@@ -664,6 +667,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.pattern", "root.db.d1");
+ extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.history.enable", "true");
// 1970-01-01T08:00:02+08:00
extractorAttributes.put("extractor.history.start-time", "2000");
@@ -813,6 +817,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("source.pattern", "root.db.d1");
+ extractorAttributes.put("source.inclusion", "data");
extractorAttributes.put("source.start-time",
"1970-01-01T08:00:02+08:00");
// 1970-01-01T08:00:04+08:00
extractorAttributes.put("source.end-time", "4000");
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 286f8c99e96..6ed563ec137 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -135,11 +135,11 @@ public class ConfigNodeConfig {
/** System directory, including version file for each database and metadata.
*/
private String systemDir =
- ConfigNodeConstant.DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
+ IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
/** Consensus directory, storage consensus protocol logs. */
private String consensusDir =
- ConfigNodeConstant.DATA_DIR + File.separator +
ConfigNodeConstant.CONSENSUS_FOLDER;
+ IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator +
IoTDBConstant.CONSENSUS_FOLDER_NAME;
/** External lib directory, stores user-uploaded JAR files. */
private String extLibDir = IoTDBConstant.EXT_FOLDER_NAME;
@@ -318,6 +318,7 @@ public class ConfigNodeConfig {
triggerTemporaryLibDir = addHomeDir(triggerTemporaryLibDir);
pipeDir = addHomeDir(pipeDir);
pipeTemporaryLibDir = addHomeDir(pipeTemporaryLibDir);
+ pipeReceiverFileDir = addHomeDir(pipeReceiverFileDir);
}
private String addHomeDir(String dir) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 214db2c5956..ca521e3a73c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.confignode.conf;
-import java.io.File;
-
public class ConfigNodeConstant {
public static final String GLOBAL_NAME = "IoTDB-ConfigNode";
@@ -31,25 +29,14 @@ public class ConfigNodeConstant {
public static final String SYSTEM_FILE_NAME = "confignode-system.properties";
public static final String CONFIGNODE_PACKAGE =
"org.apache.iotdb.confignode.service";
- public static final String JMX_TYPE = "type";
public static final String CONFIGNODE_JMX_PORT = "confignode.jmx.port";
- public static final String DATA_DIR = "data" + File.separator + "confignode";
- public static final String CONF_DIR = "conf";
- public static final String CONSENSUS_FOLDER = "consensus";
- public static final String UDF_FOLDER = "udf";
-
- public static final int MIN_SUPPORTED_JDK_VERSION = 8;
-
public static final String REMOVE_CONFIGNODE_USAGE =
"Executed failed, check usage:
<Node-id>/<internal_address>:<internal_port>";
public static final String REMOVE_DATANODE_PROCESS =
"[REMOVE_DATANODE_PROCESS]";
public static final String REGION_MIGRATE_PROCESS =
"[REGION_MIGRATE_PROCESS]";
- public static final String IOTDB_FOREGROUND = "iotdb-foreground";
- public static final String IOTDB_PIDFILE = "iotdb-pidfile";
-
private ConfigNodeConstant() {
// empty constructor
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
index a439c0ea68b..4ab5a540940 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.manager.pipe.resource.snapshot;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
-import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import java.util.Collections;
import java.util.HashSet;
@@ -28,7 +28,7 @@ import java.util.HashSet;
public class PipeConfigNodeSnapshotResourceManager extends
PipeSnapshotResourceManager {
private PipeConfigNodeSnapshotResourceManager() {
- super(new
HashSet<>(Collections.singletonList(ConfigNodeConstant.CONSENSUS_FOLDER)));
+ super(new
HashSet<>(Collections.singletonList(IoTDBConstant.CONSENSUS_FOLDER_NAME)));
}
private static class PipeConfigNodeSnapshotResourceManagerHolder {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
index efae2511b43..498ef992f3a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSche
import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
@@ -166,6 +167,16 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
return super.visitPipeDeleteTimeSeries(pipeDeleteTimeSeriesPlan, context);
}
+ @Override
+ public TSStatus visitPipeDeleteLogicalView(
+ PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, TSStatus context) {
+ if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return super.visitPipeDeleteLogicalView(pipeDeleteLogicalViewPlan,
context);
+ }
+
@Override
public TSStatus visitPipeDeactivateTemplate(
PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, TSStatus context)
{
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index dfa10dc45bb..3c756f1e32f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -93,7 +93,7 @@ public class ConfigNode implements ConfigNodeMBean {
String.format(
"%s:%s=%s",
IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
- ConfigNodeConstant.JMX_TYPE,
+ IoTDBConstant.JMX_TYPE,
ServiceType.CONFIG_NODE.getJmxName());
private final RegisterManager registerManager = new RegisterManager();
@@ -238,7 +238,7 @@ public class ConfigNode implements ConfigNodeMBean {
}
void processPid() {
- String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
+ String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
if (pidFile != null) {
new File(pidFile).deleteOnExit();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
index 1616dd02917..3d4f8a6b6b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.receiver;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -46,7 +45,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesS
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
-import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -133,18 +131,10 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
Objects.nonNull(group.getAttributesList())
? group.getAttributesList()
: new ArrayList<>());
- try {
- if (Objects.nonNull(group.getMeasurements())) {
- for (int i = 0; i < group.getMeasurements().size(); ++i) {
- paths.add(
- new PartialPath(path2Group.getKey().getFullPath(),
group.getMeasurements().get(i)));
- }
+ if (Objects.nonNull(group.getMeasurements())) {
+ for (int i = 0; i < group.getMeasurements().size(); ++i) {
+
paths.add(path2Group.getKey().concatNode(group.getMeasurements().get(i)));
}
- } catch (IllegalPathException e) {
- LOGGER.error(
- "failed to create multi timeseries statement because of {}",
e.getMessage(), e);
- throw new PipeException(
- "failed to create multi timeseries statement because of " +
e.getMessage(), e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
index 814ec1e1a83..4d112b841da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTime
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import org.apache.iotdb.rpc.TSStatusCode;
/**
@@ -155,6 +156,15 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return visitStatement(alterTimeSeriesStatement, context);
}
+ @Override
+ public TSStatus visitCreateLogicalView(CreateLogicalViewStatement statement,
TSStatus context) {
+ if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return super.visitCreateLogicalView(statement, context);
+ }
+
@Override
public TSStatus visitActivateTemplate(
ActivateTemplateStatement activateTemplateStatement, TSStatus context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
index 0b99a5859ed..946a456645c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
@@ -281,7 +281,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq
req) {
- // TODO: parse exception and status for alter logical view node
+ // We may be able to skip the alter logical view's exception parsing
because
+ // the "AlterLogicalViewNode" is itself idempotent
return req.getPlanNode() instanceof AlterLogicalViewNode
? new TPipeTransferResp(
ClusterConfigTaskExecutor.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index d6b16b15087..198ef3f6b58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -3318,56 +3318,71 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setWhereExpression(whereExpression);
}
- // region view
+ // Region view
- /**
- * Compute how many paths exist, get the schema tree and the number of
existed paths.
- *
- * @return a pair of ISchemaTree, and the number of exist paths.
- */
- private Pair<ISchemaTree, Integer> fetchSchemaOfPathsAndCount(
- List<PartialPath> pathList, Analysis analysis, MPPQueryContext context) {
- ISchemaTree schemaTree = analysis.getSchemaTree();
- if (schemaTree == null) {
- // source is not represented by query, thus has not done fetch schema.
- PathPatternTree pathPatternTree = new PathPatternTree();
- for (PartialPath path : pathList) {
- pathPatternTree.appendPathPattern(path);
+ // Create Logical View
+ @Override
+ public Analysis visitCreateLogicalView(
+ CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext
context) {
+ Analysis analysis = new Analysis();
+ context.setQueryType(QueryType.WRITE);
+ analysis.setStatement(createLogicalViewStatement);
+
+ if (createLogicalViewStatement.getViewExpressions() == null) {
+ // Analyze query in statement
+ QueryStatement queryStatement =
createLogicalViewStatement.getQueryStatement();
+ if (queryStatement != null) {
+ Pair<List<Expression>, Analysis> queryAnalysisPair =
+ this.analyzeQueryInLogicalViewStatement(analysis, queryStatement,
context);
+ if (queryAnalysisPair.right.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ } else if (queryAnalysisPair.left != null) {
+ try {
+
createLogicalViewStatement.setSourceExpressions(queryAnalysisPair.left);
+ } catch (UnsupportedViewException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
+ return analysis;
+ }
+ }
}
- schemaTree = this.schemaFetcher.fetchSchema(pathPatternTree, true,
context);
+ // Only check and use source paths when view expressions are not set
because there
+ // is no need to check source when renaming views and the check may not
be satisfied
+ // when the statement is generated by pipe
+ checkSourcePathsInCreateLogicalView(analysis,
createLogicalViewStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
+
+ // Make sure there is no view in source
+ List<Expression> sourceExpressionList =
createLogicalViewStatement.getSourceExpressionList();
+ checkViewsInSource(analysis, sourceExpressionList, context);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
+
+ // Use source and into item to generate target views
+ // If expressions are filled the target paths must be filled likewise
+ createLogicalViewStatement.parseIntoItemIfNecessary();
}
- // search each path, make sure they all exist.
- int numOfExistPaths = 0;
- for (PartialPath path : pathList) {
- Pair<List<MeasurementPath>, Integer> pathPair =
schemaTree.searchMeasurementPaths(path);
- numOfExistPaths += !pathPair.left.isEmpty() ? 1 : 0;
+ // Check target paths.
+ checkTargetPathsInCreateLogicalView(analysis, createLogicalViewStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
}
- return new Pair<>(schemaTree, numOfExistPaths);
- }
- /**
- * @param pathList the paths you want to check
- * @param schemaTree the given schema tree
- * @return if all paths you give can be found in schema tree, return a pair
of view paths and
- * null; else return view paths and the non-exist path.
- */
- private Pair<List<PartialPath>, PartialPath> findAllViewsInPaths(
- List<PartialPath> pathList, ISchemaTree schemaTree) {
- List<PartialPath> result = new ArrayList<>();
- for (PartialPath path : pathList) {
- Pair<List<MeasurementPath>, Integer> measurementPathList =
- schemaTree.searchMeasurementPaths(path);
- if (measurementPathList.left.isEmpty()) {
- return new Pair<>(result, path);
- }
- for (MeasurementPath measurementPath : measurementPathList.left) {
- if (measurementPath.getMeasurementSchema().isLogicalView()) {
- result.add(measurementPath);
- }
- }
+ // Set schema partition info, this info will be used to split logical plan
node.
+ PathPatternTree patternTree = new PathPatternTree();
+ for (PartialPath thisFullPath :
createLogicalViewStatement.getTargetPathList()) {
+ patternTree.appendFullPath(thisFullPath);
}
- return new Pair<>(result, null);
+ SchemaPartition schemaPartitionInfo =
+ partitionFetcher.getOrCreateSchemaPartition(
+ patternTree, context.getSession().getUserName());
+ analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+
+ return analysis;
}
private Pair<List<Expression>, Analysis> analyzeQueryInLogicalViewStatement(
@@ -3404,6 +3419,32 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return new Pair<>(expressionList, analysis);
}
+ private void checkSourcePathsInCreateLogicalView(
+ Analysis analysis, CreateLogicalViewStatement
createLogicalViewStatement) {
+ Pair<Boolean, String> checkResult =
+ createLogicalViewStatement.checkSourcePathsIfNotUsingQueryStatement();
+ if (Boolean.FALSE.equals(checkResult.left)) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+ "The path " + checkResult.right + " is illegal."));
+ return;
+ }
+
+ List<PartialPath> targetPathList =
createLogicalViewStatement.getTargetPathList();
+ if (createLogicalViewStatement.getSourceExpressionList().size() !=
targetPathList.size()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
+ String.format(
+ "The number of target paths (%d) and sources (%d) are miss
matched! Please check your SQL.",
+ createLogicalViewStatement.getTargetPathList().size(),
+
createLogicalViewStatement.getSourceExpressionList().size())));
+ }
+ }
+
private void checkViewsInSource(
Analysis analysis, List<Expression> sourceExpressionList,
MPPQueryContext context) {
List<PartialPath> pathsNeedCheck = new ArrayList<>();
@@ -3415,7 +3456,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Pair<ISchemaTree, Integer> schemaOfNeedToCheck =
fetchSchemaOfPathsAndCount(pathsNeedCheck, analysis, context);
if (schemaOfNeedToCheck.right != pathsNeedCheck.size()) {
- // some source paths is not exist, and could not fetch schema.
+ // Some source paths is not exist, and could not fetch schema.
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(
@@ -3426,7 +3467,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Pair<List<PartialPath>, PartialPath> viewInSourceCheckResult =
findAllViewsInPaths(pathsNeedCheck, schemaOfNeedToCheck.left);
if (viewInSourceCheckResult.right != null) {
- // some source paths is not exist
+ // Some source paths is not exist
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(
@@ -3437,7 +3478,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return;
}
if (!viewInSourceCheckResult.left.isEmpty()) {
- // some source paths is logical view
+ // Some source paths is logical view
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(
@@ -3446,9 +3487,59 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private void checkPathsInCreateLogicalView(
+ /**
+ * Compute how many paths exist, get the schema tree and the number of
existed paths.
+ *
+ * @return a pair of ISchemaTree, and the number of exist paths.
+ */
+ private Pair<ISchemaTree, Integer> fetchSchemaOfPathsAndCount(
+ List<PartialPath> pathList, Analysis analysis, MPPQueryContext context) {
+ ISchemaTree schemaTree = analysis.getSchemaTree();
+ if (schemaTree == null) {
+ // source is not represented by query, thus has not done fetch schema.
+ PathPatternTree pathPatternTree = new PathPatternTree();
+ for (PartialPath path : pathList) {
+ pathPatternTree.appendPathPattern(path);
+ }
+ schemaTree = this.schemaFetcher.fetchSchema(pathPatternTree, true,
context);
+ }
+
+ // search each path, make sure they all exist.
+ int numOfExistPaths = 0;
+ for (PartialPath path : pathList) {
+ Pair<List<MeasurementPath>, Integer> pathPair =
schemaTree.searchMeasurementPaths(path);
+ numOfExistPaths += !pathPair.left.isEmpty() ? 1 : 0;
+ }
+ return new Pair<>(schemaTree, numOfExistPaths);
+ }
+
+ /**
+ * @param pathList the paths you want to check
+ * @param schemaTree the given schema tree
+ * @return if all paths you give can be found in schema tree, return a pair
of view paths and
+ * null; else return view paths and the non-exist path.
+ */
+ private Pair<List<PartialPath>, PartialPath> findAllViewsInPaths(
+ List<PartialPath> pathList, ISchemaTree schemaTree) {
+ List<PartialPath> result = new ArrayList<>();
+ for (PartialPath path : pathList) {
+ Pair<List<MeasurementPath>, Integer> measurementPathList =
+ schemaTree.searchMeasurementPaths(path);
+ if (measurementPathList.left.isEmpty()) {
+ return new Pair<>(result, path);
+ }
+ for (MeasurementPath measurementPath : measurementPathList.left) {
+ if (measurementPath.getMeasurementSchema().isLogicalView()) {
+ result.add(measurementPath);
+ }
+ }
+ }
+ return new Pair<>(result, null);
+ }
+
+ private void checkTargetPathsInCreateLogicalView(
Analysis analysis, CreateLogicalViewStatement
createLogicalViewStatement) {
- Pair<Boolean, String> checkResult =
createLogicalViewStatement.checkAllPaths();
+ Pair<Boolean, String> checkResult =
createLogicalViewStatement.checkTargetPaths();
if (Boolean.FALSE.equals(checkResult.left)) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
@@ -3457,8 +3548,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
"The path " + checkResult.right + " is illegal."));
return;
}
- // make sure there are no redundant paths in targets. Please note that
redundant paths in source
- // are legal!
+ // Make sure there are no redundant paths in targets. Note that redundant
paths in source
+ // are legal.
List<PartialPath> targetPathList =
createLogicalViewStatement.getTargetPathList();
Set<String> targetStringSet = new HashSet<>();
for (PartialPath path : targetPathList) {
@@ -3472,18 +3563,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return;
}
}
- if (createLogicalViewStatement.getSourceExpressionList().size() !=
targetPathList.size()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(
- TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
- String.format(
- "The number of target paths (%d) and sources (%d) are miss
matched! Please check your SQL.",
- createLogicalViewStatement.getTargetPathList().size(),
-
createLogicalViewStatement.getSourceExpressionList().size())));
- return;
- }
- // make sure all paths are NOt under any template
+ // Make sure all paths are not under any templates
try {
for (PartialPath path : createLogicalViewStatement.getTargetPathList()) {
checkIsTemplateCompatible(path, null);
@@ -3497,63 +3577,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- // create Logical View
- @Override
- public Analysis visitCreateLogicalView(
- CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext
context) {
- Analysis analysis = new Analysis();
- context.setQueryType(QueryType.WRITE);
- analysis.setStatement(createLogicalViewStatement);
-
- if (createLogicalViewStatement.getViewExpressions() == null) {
- // analyze query in statement
- QueryStatement queryStatement =
createLogicalViewStatement.getQueryStatement();
- if (queryStatement != null) {
- Pair<List<Expression>, Analysis> queryAnalysisPair =
- this.analyzeQueryInLogicalViewStatement(analysis, queryStatement,
context);
- if (queryAnalysisPair.right.isFinishQueryAfterAnalyze()) {
- return analysis;
- } else if (queryAnalysisPair.left != null) {
- try {
-
createLogicalViewStatement.setSourceExpressions(queryAnalysisPair.left);
- } catch (UnsupportedViewException e) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
- return analysis;
- }
- }
- }
- }
-
- // use source and into item to generate target views
- createLogicalViewStatement.parseIntoItemIfNecessary();
-
- // check target paths; check source expressions.
- checkPathsInCreateLogicalView(analysis, createLogicalViewStatement);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
- }
-
- // make sure there is no view in source
- List<Expression> sourceExpressionList =
createLogicalViewStatement.getSourceExpressionList();
- checkViewsInSource(analysis, sourceExpressionList, context);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
- }
-
- // set schema partition info, this info will be used to split logical plan
node.
- PathPatternTree patternTree = new PathPatternTree();
- for (PartialPath thisFullPath :
createLogicalViewStatement.getTargetPathList()) {
- patternTree.appendFullPath(thisFullPath);
- }
- SchemaPartition schemaPartitionInfo =
- partitionFetcher.getOrCreateSchemaPartition(
- patternTree, context.getSession().getUserName());
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-
- return analysis;
- }
-
@Override
public Analysis visitShowLogicalView(
ShowLogicalViewStatement showLogicalViewStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
index f6cbb097260..29dc6fab471 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
@@ -235,8 +235,8 @@ public class CreateLogicalViewStatement extends Statement {
/**
* Check errors in targetPaths.
*
- * @return Pair<Boolean, String>. True: checks passed; False: checks failed.
if check failed,
- * return the string of illegal path.
+ * @return Pair {@literal <}Boolean, String{@literal >}. True: checks
passed; False: checks
+ * failed. If check failed, return the string of illegal path.
*/
public Pair<Boolean, String> checkTargetPaths() {
for (PartialPath thisPath : this.getTargetPathList()) {
@@ -251,8 +251,8 @@ public class CreateLogicalViewStatement extends Statement {
* Check errors in sourcePaths. Only usable when not using read statement.
If this statement is
* generated with a read statement, check always pass; if not, check each
full paths.
*
- * @return Pair<Boolean, String>. True: checks passed; False: checks failed.
if check failed,
- * return the string of illegal path.
+ * @return Pair {@literal <}Boolean, String{@literal >}. True: checks
passed; False: checks
+ * failed. If check failed, return the string of illegal path.
*/
public Pair<Boolean, String> checkSourcePathsIfNotUsingQueryStatement() {
if (this.sourcePaths.viewPathType == ViewPathType.PATHS_GROUP
@@ -267,8 +267,8 @@ public class CreateLogicalViewStatement extends Statement {
}
/**
- * @return return true if checks passed; else return false. if check failed,
return the string of
- * illegal path.
+ * @return return {@link true} if checks passed; else return {@link false}.
If check failed,
+ * return the string of illegal path.
*/
public Pair<Boolean, String> checkAllPaths() {
Pair<Boolean, String> result = this.checkTargetPaths();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 801825e29ce..35581b06705 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.commons.conf;
import java.io.File;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index b0ce404cd92..737313ec378 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -53,12 +53,7 @@ public class PipeMeta {
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- public void serialize(DataOutputStream outputStream) throws IOException {
- staticMeta.serialize(outputStream);
- runtimeMeta.serialize(outputStream);
- }
-
- public void serialize(FileOutputStream outputStream) throws IOException {
+ public void serialize(OutputStream outputStream) throws IOException {
staticMeta.serialize(outputStream);
runtimeMeta.serialize(outputStream);
}