This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new dd66c7a5fc8 finish first IT & support flush
dd66c7a5fc8 is described below
commit dd66c7a5fc81c5702d2b0166bf14dce8cd6bcc0f
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 15 12:10:45 2024 +0800
finish first IT & support flush
---
.../iotdb/session/it/IoTDBSessionRelationalIT.java | 27 +++++++++++++++----
.../operator/process/CollectOperator.java | 3 +++
.../iotdb/db/queryengine/plan/Coordinator.java | 4 ++-
.../queryengine/plan/analyze/SelectIntoUtils.java | 5 ++--
.../execution/config/TableConfigTaskVisitor.java | 9 +++++++
.../plan/planner/TableOperatorGenerator.java | 1 +
.../relational/analyzer/StatementAnalyzer.java | 1 +
.../plan/relational/planner/RelationPlanner.java | 1 +
.../distribute/DistributedPlanGenerator.java | 2 ++
.../plan/relational/planner/node/CollectNode.java | 30 +++++++++++++++++++---
.../plan/relational/sql/ast/AstVisitor.java | 4 +++
.../queryengine/plan/relational/sql/ast/Flush.java | 17 ++++++++++++
.../plan/relational/sql/parser/AstBuilder.java | 28 +++++++++++++++++++-
.../apache/iotdb/commons/path/AlignedFullPath.java | 19 ++++++++++++++
14 files changed, 139 insertions(+), 12 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index 84d5211f024..a53489923dd 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -94,10 +94,9 @@ public class IoTDBSessionRelationalIT {
final List<ColumnType> columnTypes =
Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE,
ColumnType.MEASUREMENT);
+ long timestamp = 0;
Tablet tablet = new Tablet("table1", schemaList, columnTypes, 15);
- long timestamp = System.currentTimeMillis();
-
for (long row = 0; row < 15; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp + row);
@@ -108,7 +107,6 @@ public class IoTDBSessionRelationalIT {
session.insertRelationalTablet(tablet, true);
tablet.reset();
}
- timestamp++;
}
if (tablet.rowSize != 0) {
@@ -116,10 +114,29 @@ public class IoTDBSessionRelationalIT {
tablet.reset();
}
- SessionDataSet dataSet = session.executeQueryStatement("select * from
table1");
+ session.executeNonQueryStatement("FLush");
+
+ for (long row = 15; row < 30; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("attr1", rowIndex, "attr:" + row);
+ tablet.addValue("m1", rowIndex, row * 1.0);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertRelationalTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
table1 order by time");
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
- assertEquals(15L, rowRecord.getFields().get(0).getLongV());
+// assertEquals(0L, rowRecord.getFields().get(0).getLongV());
System.out.println(rowRecord);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 2f55743854e..4b0ecf27c37 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -68,6 +68,9 @@ public class CollectOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
+ if (currentIndex >= children.size()) {
+ return NOT_BLOCKED;
+ }
return children.get(currentIndex).isBlocked();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 170eedb0e70..ef515cff01b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
@@ -322,7 +323,8 @@ public class Coordinator {
|| statement instanceof CreateTable
|| statement instanceof DescribeTable
|| statement instanceof ShowTables
- || statement instanceof DropTable) {
+ || statement instanceof DropTable
+ || statement instanceof Flush) {
return new ConfigExecution(
queryContext,
null,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
index d0c3617d530..9d2e1f7fb82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -41,6 +41,7 @@ import java.util.regex.Matcher;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+import static
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
public class SelectIntoUtils {
@@ -108,7 +109,7 @@ public class SelectIntoUtils {
resNode = matcher.replaceFirst(sourceNodes[index]);
matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
}
- return ASTVisitor.parseNodeString(resNode);
+ return parseNodeString(resNode);
}
public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions)
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index e05a2d740ba..612e26d92ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowDBTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask;
+import org.apache.iotdb.db.queryengine.plan.execution.config.sys.FlushTask;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableHeaderSchemaValidator;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
@@ -44,6 +45,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
@@ -52,6 +54,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
import
org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException;
+import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.tsfile.enums.TSDataType;
import java.util.HashMap;
@@ -186,4 +189,10 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
protected IConfigTask visitCurrentDatabase(CurrentDatabase node,
MPPQueryContext context) {
return super.visitCurrentDatabase(node, context);
}
+
+ @Override
+ protected IConfigTask visitFlush(Flush node, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ return new FlushTask(((FlushStatement) node.getInnerTreeStatement()));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index b1ba6ded7f0..e32085bbc23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner;
+import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 4e22b1016d4..44a45b43c39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FieldReference;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingElement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 5065f822eeb..9fab1165a9d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 40166d9413d..5337c68bcb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -83,6 +83,7 @@ public class DistributedPlanGenerator
return res;
} else if (res.size() > 1) {
CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+ collectNode.setOutputSymbols(res.get(0).getOutputSymbols());
res.forEach(collectNode::addChild);
return Collections.singletonList(collectNode);
} else {
@@ -327,6 +328,7 @@ public class DistributedPlanGenerator
// children has no sort property, use CollectNode to merge children
CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+ collectNode.setOutputSymbols(firstChild.getOutputSymbols());
childrenNodes.forEach(collectNode::addChild);
return collectNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
index 7920216428d..9b2a4cd2812 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+import java.util.ArrayList;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -32,6 +33,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
/**
* CollectNode output the content of children. Normally it will output the
child one by one, but in
@@ -39,6 +41,8 @@ import java.util.List;
*/
public class CollectNode extends MultiChildProcessNode {
+ private List<Symbol> outputSymbols;
+
public CollectNode(PlanNodeId id) {
super(id);
}
@@ -50,12 +54,19 @@ public class CollectNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new CollectNode(id);
+ CollectNode collectNode = new CollectNode(id);
+ collectNode.outputSymbols = outputSymbols;
+ return collectNode;
+ }
+
+ public void setOutputSymbols(
+ List<Symbol> outputSymbols) {
+ this.outputSymbols = outputSymbols;
}
@Override
public List<Symbol> getOutputSymbols() {
- return children.get(0).getOutputSymbols();
+ return outputSymbols;
}
@Override
@@ -66,16 +77,29 @@ public class CollectNode extends MultiChildProcessNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TABLE_COLLECT_NODE.serialize(byteBuffer);
+ ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer);
+ outputSymbols.forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.TABLE_COLLECT_NODE.serialize(stream);
+ ReadWriteIOUtils.write(outputSymbols.size(), stream);
+ for (Symbol symbol : outputSymbols) {
+ Symbol.serialize(symbol, stream);
+ }
}
public static CollectNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new CollectNode(planNodeId);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Symbol> outputs = new ArrayList<>(size);
+ while (size-- > 0) {
+ outputs.add(Symbol.deserialize(byteBuffer));
+ }
+ CollectNode collectNode = new CollectNode(planNodeId);
+ collectNode.setOutputSymbols(outputs);
+ return collectNode;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 55c074545bb..1d72a8c7b65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -375,6 +375,10 @@ public abstract class AstVisitor<R, C> {
return visitStatement(node, context);
}
+ protected R visitFlush(Flush node, C context) {
+ return visitStatement(node, context);
+ }
+
protected R visitInsertRow(InsertRow node, C context) {
return visitStatement(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
new file mode 100644
index 00000000000..11016572eb3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+public class Flush extends WrappedStatement{
+
+ public Flush(Statement innerTreeStatement,
+ MPPQueryContext context) {
+ super(innerTreeStatement, context);
+ }
+
+ @Override
+ public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ return visitor.visitFlush(this, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index ef2a3dfe5e8..53aa6d22f41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.parser;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns;
@@ -55,6 +59,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericDataType;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy;
@@ -124,9 +129,12 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WhenClause;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlBaseVisitor;
import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlLexer;
import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser;
+import
org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser.IdentifierContext;
import org.apache.iotdb.db.utils.DateTimeUtils;
import com.google.common.collect.ImmutableList;
@@ -468,7 +476,25 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
@Override
public Node visitFlushStatement(RelationalSqlParser.FlushStatementContext
ctx) {
- return super.visitFlushStatement(ctx);
+ FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
+ List<PartialPath> storageGroups = null;
+ if (ctx.booleanValue() != null) {
+
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
+ }
+ flushStatement.setOnCluster(ctx.localOrClusterMode() == null ||
ctx.localOrClusterMode().LOCAL() == null);
+ if (ctx.identifier() != null) {
+ storageGroups = new ArrayList<>();
+ List<Identifier> identifiers = getIdentifiers(ctx.identifier());
+ for (Identifier identifier : identifiers) {
+ try {
+ storageGroups.add(new
PartialPath(PathUtils.qualifyDatabaseName(identifier.getValue())));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ flushStatement.setStorageGroups(storageGroups);
+ return new Flush(flushStatement, null);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
index 4f94d699388..fefa80f95b2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.path;
+import java.util.Objects;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -73,4 +74,22 @@ public class AlignedFullPath implements IFullPath {
+ deviceID.ramBytesUsed()
+ measurementList.stream().mapToLong(RamUsageEstimator::sizeOf).sum()
* 2;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AlignedFullPath that = (AlignedFullPath) o;
+ return Objects.equals(deviceID, that.deviceID) && Objects.equals(
+ measurementList, that.measurementList);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceID, measurementList);
+ }
}