This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 454a1d8740 [Feature][multistage] Thread-safe query planning (#9344)
454a1d8740 is described below

commit 454a1d87400b3033b72b5a6beb06432134ff3c5e
Author: Yao Liu <[email protected]>
AuthorDate: Fri Sep 9 21:04:55 2022 -0700

    [Feature][multistage] Thread-safe query planning (#9344)
    
    * multi-thread query planning
    
    * Use auto-close planner context and fix test
    
    * address styple comments
    
    * address javadoc
    
    * multi-thread query planning
    
    * Use auto-close planner context and fix test
    
    * address styple comments
    
    * address javadoc
---
 .../org/apache/pinot/query/QueryEnvironment.java   | 62 +++++++---------------
 .../apache/pinot/query/context/PlannerContext.java | 48 +++++++++++++++--
 .../apache/pinot/query/QueryCompilationTest.java   | 54 +++++++++++++++++++
 3 files changed, 118 insertions(+), 46 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index da13d03a35..c1797381ba 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -24,14 +24,12 @@ import java.util.Properties;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.prepare.PlannerImpl;
 import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
@@ -44,8 +42,6 @@ import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.sql2rel.StandardConvertletTable;
 import org.apache.calcite.tools.FrameworkConfig;
@@ -53,11 +49,9 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.logical.LogicalPlanner;
 import org.apache.pinot.query.planner.logical.StagePlanner;
 import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.type.TypeFactory;
-import org.apache.pinot.query.validate.Validator;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 
@@ -73,11 +67,10 @@ public class QueryEnvironment {
 
   // Calcite extension/plugins
   private final CalciteSchema _rootSchema;
-  private final PlannerImpl _planner;
   private final Prepare.CatalogReader _catalogReader;
   private final RelDataTypeFactory _typeFactory;
-  private final RelOptPlanner _relOptPlanner;
-  private final SqlValidator _validator;
+
+  private final HepProgram _hepProgram;
 
   // Pinot extensions
   private final Collection<RelOptRule> _logicalRuleSet;
@@ -89,15 +82,11 @@ public class QueryEnvironment {
     _workerManager = workerManager;
     _config = Frameworks.newConfigBuilder().traitDefs().build();
 
-    // Planner is not thread-safe. must be reset() after each use.
-    _planner = new PlannerImpl(_config);
-
     // catalog
     Properties catalogReaderConfigProperties = new Properties();
     
catalogReaderConfigProperties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
 "true");
     _catalogReader = new CalciteCatalogReader(_rootSchema, 
_rootSchema.path(null), _typeFactory,
         new CalciteConnectionConfigImpl(catalogReaderConfigProperties));
-    _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, 
_typeFactory);
 
     // optimizer rules
     _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;
@@ -107,19 +96,13 @@ public class QueryEnvironment {
     for (RelOptRule relOptRule : _logicalRuleSet) {
       hepProgramBuilder.addRuleInstance(relOptRule);
     }
-    _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(), 
Contexts.EMPTY_CONTEXT);
+    _hepProgram = hepProgramBuilder.build();
   }
 
   /**
    * Plan a SQL query.
    *
-   * <p>Noted that since Calcite's {@link org.apache.calcite.tools.Planner} is 
not threadsafe.
-   * Only one query query can be planned at a time. Afterwards planner needs 
to be reset in order to clear the
-   * state for the next planning.
-   *
-   * <p>In order for faster planning, we pre-constructed all the planner 
objects and use the plan-then-reset
-   * model. Thusn when using {@code QueryEnvironment#planQuery(String)}, 
caller should ensure that no concurrent
-   * plan execution occurs.
+   * This function is thread safe since we construct a new PlannerContext 
every time.
    *
    * TODO: follow benchmark and profile to measure whether it make sense for 
the latency-concurrency trade-off
    * between reusing plannerImpl vs. create a new planner for each query.
@@ -129,8 +112,7 @@ public class QueryEnvironment {
    * @return a dispatchable query plan
    */
   public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions 
sqlNodeAndOptions) {
-    try {
-      PlannerContext plannerContext = new PlannerContext();
+    try (PlannerContext plannerContext = new PlannerContext(_config, 
_catalogReader, _typeFactory, _hepProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelNode relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), 
plannerContext);
       return toDispatchablePlan(relRoot, plannerContext);
@@ -151,14 +133,13 @@ public class QueryEnvironment {
    * @return the explained query plan.
    */
   public String explainQuery(String sqlQuery, SqlNodeAndOptions 
sqlNodeAndOptions) {
-    try {
+    try (PlannerContext plannerContext = new PlannerContext(_config, 
_catalogReader, _typeFactory, _hepProgram)) {
       SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
-      PlannerContext plannerContext = new PlannerContext();
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelNode relRoot = compileQuery(explain.getExplicandum(), plannerContext);
       SqlExplainFormat format = explain.getFormat() == null ? 
SqlExplainFormat.DOT : explain.getFormat();
-      SqlExplainLevel level = explain.getDetailLevel() == null ? 
SqlExplainLevel.DIGEST_ATTRIBUTES
-          : explain.getDetailLevel();
+      SqlExplainLevel level =
+          explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES 
: explain.getDetailLevel();
       return PlannerUtils.explainPlan(relRoot, format, level);
     } catch (Exception e) {
       throw new RuntimeException("Error explain query plan for: " + sqlQuery, 
e);
@@ -182,20 +163,15 @@ public class QueryEnvironment {
   @VisibleForTesting
   protected RelNode compileQuery(SqlNode sqlNode, PlannerContext 
plannerContext)
       throws Exception {
-    try {
-      SqlNode validated = validate(sqlNode);
-      RelRoot relation = toRelation(validated, plannerContext);
-      return optimize(relation, plannerContext);
-    } finally {
-      _planner.close();
-      _planner.reset();
-    }
+    SqlNode validated = validate(sqlNode, plannerContext);
+    RelRoot relation = toRelation(validated, plannerContext);
+    return optimize(relation, plannerContext);
   }
 
-  private SqlNode validate(SqlNode parsed)
+  private SqlNode validate(SqlNode parsed, PlannerContext plannerContext)
       throws Exception {
     // 2. validator to validate.
-    SqlNode validated = _validator.validate(parsed);
+    SqlNode validated = plannerContext.getValidator().validate(parsed);
     if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) {
       throw new IllegalArgumentException(
           String.format("unsupported SQL query, cannot validate out a valid 
sql from:\n%s", parsed));
@@ -206,9 +182,10 @@ public class QueryEnvironment {
   private RelRoot toRelation(SqlNode parsed, PlannerContext plannerContext) {
     // 3. convert sqlNode to relNode.
     RexBuilder rexBuilder = new RexBuilder(_typeFactory);
-    RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder);
+    RelOptCluster cluster = 
RelOptCluster.create(plannerContext.getRelOptPlanner(), rexBuilder);
     SqlToRelConverter sqlToRelConverter =
-        new SqlToRelConverter(_planner, _validator, _catalogReader, cluster, 
StandardConvertletTable.INSTANCE,
+        new SqlToRelConverter(plannerContext.getPlanner(), 
plannerContext.getValidator(), _catalogReader, cluster,
+            StandardConvertletTable.INSTANCE,
             
SqlToRelConverter.config().withHintStrategyTable(getHintStrategyTable(plannerContext)));
     return sqlToRelConverter.convertQuery(parsed, false, true);
   }
@@ -217,8 +194,8 @@ public class QueryEnvironment {
     // 4. optimize relNode
     // TODO: add support for traits, cost factory.
     try {
-      _relOptPlanner.setRoot(relRoot.rel);
-      return _relOptPlanner.findBestExp();
+      plannerContext.getRelOptPlanner().setRoot(relRoot.rel);
+      return plannerContext.getRelOptPlanner().findBestExp();
     } catch (Exception e) {
       throw new UnsupportedOperationException(
           "Cannot generate a valid execution plan for the given query: " + 
RelOptUtil.toString(relRoot.rel), e);
@@ -231,7 +208,6 @@ public class QueryEnvironment {
     return queryStagePlanner.makePlan(relRoot);
   }
 
-
   // --------------------------------------------------------------------------
   // utils
   // --------------------------------------------------------------------------
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
index 1997d1f370..859b5e3d60 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -19,17 +19,53 @@
 package org.apache.pinot.query.context;
 
 import java.util.Map;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.prepare.PlannerImpl;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.pinot.query.planner.logical.LogicalPlanner;
+import org.apache.pinot.query.validate.Validator;
 
 
 /**
  * PlannerContext is an object that holds all contextual information during 
planning phase.
  *
- * TODO: currently the planner context is not used since we don't support 
option or query rewrite. This construct is
- * here as a placeholder for the parsed out options.
+ * TODO: currently we don't support option or query rewrite.
+ * It is used to hold per query context for query planning, which cannot be 
shared across queries.
  */
-public class PlannerContext {
+public class PlannerContext implements AutoCloseable {
+  private final PlannerImpl _planner;
+
+  private final SqlValidator _validator;
+
+  private final RelOptPlanner _relOptPlanner;
+
   private Map<String, String> _options;
 
+  public PlannerContext(FrameworkConfig config, Prepare.CatalogReader 
catalogReader, RelDataTypeFactory typeFactory,
+      HepProgram hepProgram) {
+    _planner = new PlannerImpl(config);
+    _validator = new Validator(SqlStdOperatorTable.instance(), catalogReader, 
typeFactory);
+    _relOptPlanner = new LogicalPlanner(hepProgram, Contexts.EMPTY_CONTEXT);
+  }
+
+  public PlannerImpl getPlanner() {
+    return _planner;
+  }
+
+  public SqlValidator getValidator() {
+    return _validator;
+  }
+
+  public RelOptPlanner getRelOptPlanner() {
+    return _relOptPlanner;
+  }
+
   public void setOptions(Map<String, String> options) {
     _options = options;
   }
@@ -37,4 +73,10 @@ public class PlannerContext {
   public Map<String, String> getOptions() {
     return _options;
   }
+
+  @Override
+  public void close()
+      throws Exception {
+    _planner.close();
+  }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index cb666bfeba..dcd7c5c11a 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,8 +19,12 @@
 package org.apache.pinot.query;
 
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -176,6 +180,56 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
     
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "Server_localhost_1");
   }
 
+  // Test that plan query can be run as multi-thread.
+  @Test
+  public void testPlanQueryMultiThread()
+      throws Exception {
+    Map<String, ArrayList<QueryPlan>> queryPlans = new HashMap<>();
+    Lock lock = new ReentrantLock();
+    Runnable joinQuery = () -> {
+      String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON 
a.col1 = b.col2";
+      QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+      lock.lock();
+      if (!queryPlans.containsKey(queryPlan)) {
+        queryPlans.put(query, new ArrayList<>());
+      }
+      queryPlans.get(query).add(queryPlan);
+      lock.unlock();
+    };
+    Runnable selectQuery = () -> {
+      String query = "SELECT * FROM a";
+      QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+      lock.lock();
+      if (!queryPlans.containsKey(queryPlan)) {
+        queryPlans.put(query, new ArrayList<>());
+      }
+      queryPlans.get(query).add(queryPlan);
+      lock.unlock();
+    };
+    ArrayList<Thread> threads = new ArrayList<>();
+    final int numThreads = 10;
+    for (int i = 0; i < numThreads; i++) {
+      Thread thread = null;
+      if (i % 2 == 0) {
+        thread = new Thread(joinQuery);
+      } else {
+        thread = new Thread(selectQuery);
+      }
+      threads.add(thread);
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+    for (ArrayList<QueryPlan> plans : queryPlans.values()) {
+      for (QueryPlan plan : plans) {
+        Assert.assertTrue(plan.equals(plans.get(0)));
+      }
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Test Utils.
   // --------------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to