This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fa4fe21ea1 [IoTDB-3170] Add MemoryTable framework for MPP (#5890)
fa4fe21ea1 is described below

commit fa4fe21ea1ebf7a7f0048997cdbd5cb43431342e
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri May 13 11:50:13 2022 +0800

    [IoTDB-3170] Add MemoryTable framework for MPP (#5890)
---
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 14 +++-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  2 +-
 .../db/mpp/plan/execution/QueryExecution.java      | 21 ++++--
 .../plan/execution/memory/MemorySourceHandle.java  | 80 ++++++++++++++++++++++
 .../execution/memory/StatementMemorySource.java    | 41 +++++++++++
 .../memory/StatementMemorySourceContext.java       | 41 +++++++++++
 .../memory/StatementMemorySourceVisitor.java       | 36 ++++++++++
 .../execution/memory/MemorySourceHandleTest.java   | 42 ++++++++++++
 8 files changed, 271 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 892d24cf77..ee515b4bcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -93,7 +93,11 @@ public class Analysis {
   // not 0 because device is the first column
   private Map<String, List<Integer>> deviceToMeasurementIndexesMap;
 
-  public Analysis() {}
+  private boolean finishQueryAfterAnalyze;
+
+  public Analysis() {
+    this.finishQueryAfterAnalyze = false;
+  }
 
   public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, 
Filter timefilter) {
     // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
@@ -241,6 +245,14 @@ public class Analysis {
     this.groupByTimeParameter = groupByTimeParameter;
   }
 
+  public boolean isFinishQueryAfterAnalyze() {
+    return finishQueryAfterAnalyze;
+  }
+
+  public void setFinishQueryAfterAnalyze(boolean finishQueryAfterAnalyze) {
+    this.finishQueryAfterAnalyze = finishQueryAfterAnalyze;
+  }
+
   public void setDeviceToMeasurementIndexesMap(
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
     this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index f52ba6fd77..9522387207 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -140,7 +140,7 @@ public class Analyzer {
         logger.info("{} fetch schema done", getLogHeader());
         // If there is no leaf node in the schema tree, the query should be 
completed immediately
         if (schemaTree.isEmpty()) {
-          analysis.setRespDatasetHeader(new DatasetHeader(new ArrayList<>(), 
false));
+          analysis.setFinishQueryAfterAnalyze(true);
           return analysis;
         }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 257559527c..e7371272b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -34,6 +34,10 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
+import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
+import 
org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
+import 
org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceVisitor;
 import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
@@ -141,9 +145,9 @@ public class QueryExecution implements IQueryExecution {
   public void start() {
     if (skipExecute()) {
       logger.info(
-          "{} execution of query will be skipped. Transit to FINISHED 
immediately.",
-          getLogHeader());
-      stateMachine.transitionToFinished();
+          "{} execution of query will be skipped. Transit to RUNNING 
immediately.", getLogHeader());
+      constructResultForMemorySource();
+      stateMachine.transitionToRunning();
       return;
     }
     doLogicalPlan();
@@ -155,7 +159,16 @@ public class QueryExecution implements IQueryExecution {
   }
 
   private boolean skipExecute() {
-    return context.getQueryType() == QueryType.READ && 
!analysis.hasDataSource();
+    return analysis.isFinishQueryAfterAnalyze()
+        || (context.getQueryType() == QueryType.READ && 
!analysis.hasDataSource());
+  }
+
+  private void constructResultForMemorySource() {
+    StatementMemorySource memorySource =
+        new StatementMemorySourceVisitor()
+            .process(analysis.getStatement(), new 
StatementMemorySourceContext(context, analysis));
+    this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock());
+    this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader());
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query 
need
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
new file mode 100644
index 0000000000..fcd57059c5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class MemorySourceHandle implements ISourceHandle {
+
+  private final TsBlock result;
+  private boolean hasNext;
+
+  public MemorySourceHandle(TsBlock result) {
+    Validate.notNull(result, "the TsBlock should not be null when constructing 
MemorySourceHandle");
+    this.result = result;
+    this.hasNext = true;
+  }
+
+  @Override
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getLocalPlanNodeId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TsBlock receive() {
+    hasNext = false;
+    return result;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return immediateFuture(null);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return false;
+  }
+
+  @Override
+  public void abort() {}
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySource.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySource.java
new file mode 100644
index 0000000000..b26dc86a78
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class StatementMemorySource {
+  private final TsBlock tsBlock;
+  private final DatasetHeader datasetHeader;
+
+  public StatementMemorySource(TsBlock tsBlock, DatasetHeader datasetHeader) {
+    this.tsBlock = tsBlock;
+    this.datasetHeader = datasetHeader;
+  }
+
+  public TsBlock getTsBlock() {
+    return tsBlock;
+  }
+
+  public DatasetHeader getDatasetHeader() {
+    return datasetHeader;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceContext.java
new file mode 100644
index 0000000000..bc4934049d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+
+public class StatementMemorySourceContext {
+  private final MPPQueryContext queryContext;
+  private final Analysis analysis;
+
+  public StatementMemorySourceContext(MPPQueryContext queryContext, Analysis 
analysis) {
+    this.queryContext = queryContext;
+    this.analysis = analysis;
+  }
+
+  public MPPQueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  public Analysis getAnalysis() {
+    return analysis;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
new file mode 100644
index 0000000000..aa7ebeff61
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.util.ArrayList;
+
+public class StatementMemorySourceVisitor
+    extends StatementVisitor<StatementMemorySource, 
StatementMemorySourceContext> {
+
+  @Override
+  public StatementMemorySource visitNode(StatementNode node, 
StatementMemorySourceContext context) {
+    return new StatementMemorySource(new TsBlock(0), new DatasetHeader(new 
ArrayList<>(), false));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
new file mode 100644
index 0000000000..fd91eb8e6b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.memory;
+
+import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MemorySourceHandleTest {
+
+  @Test
+  public void testNormalFinished() {
+    TsBlock rawResult = new TsBlock(0);
+    MemorySourceHandle sourceHandle = new MemorySourceHandle(rawResult);
+    Assert.assertFalse(sourceHandle.isFinished());
+    ListenableFuture<Void> blocked = sourceHandle.isBlocked();
+    Assert.assertTrue(blocked.isDone());
+    TsBlock result = sourceHandle.receive();
+    Assert.assertEquals(rawResult, result);
+    Assert.assertTrue(sourceHandle.isFinished());
+  }
+}

Reply via email to