This is an automated email from the ASF dual-hosted git repository.

rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new eb902375a2 Light refactor of the heavily refactored statement classes 
(#12909)
eb902375a2 is described below

commit eb902375a233266229c6955cbe16833992579605
Author: Paul Rogers <[email protected]>
AuthorDate: Thu Aug 18 14:01:06 2022 -0700

    Light refactor of the heavily refactored statement classes (#12909)
    
    Reflects lessons learned from working with consumers of
    the new code.
---
 .../org/apache/druid/sql/AbstractStatement.java    |  30 ++--
 .../java/org/apache/druid/sql/DirectStatement.java | 193 +++++++++++++++++----
 .../java/org/apache/druid/sql/HttpStatement.java   |   5 -
 .../org/apache/druid/sql/SqlPlanningException.java |   6 +
 .../sql/avatica/DruidJdbcPreparedStatement.java    |   2 +-
 .../druid/sql/avatica/DruidJdbcResultSet.java      |   2 +-
 .../druid/sql/calcite/planner/PlannerResult.java   |   8 +-
 .../org/apache/druid/sql/http/SqlResource.java     |   7 +-
 .../org/apache/druid/sql/SqlStatementTest.java     |  63 ++++++-
 .../calcite/schema/SegmentMetadataCacheTest.java   |  27 +--
 .../org/apache/druid/sql/http/SqlResourceTest.java |  63 ++++---
 11 files changed, 303 insertions(+), 103 deletions(-)

diff --git a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java 
b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
index 2a001b7e50..99605918e2 100644
--- a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
@@ -30,7 +30,6 @@ import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.calcite.planner.DruidPlanner;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.planner.PlannerResult;
 
 import java.io.Closeable;
 import java.util.Set;
@@ -160,20 +159,6 @@ public abstract class AbstractStatement implements 
Closeable
     );
   }
 
-  /**
-   * Plan the query, which also produces the sequence that runs
-   * the query.
-   */
-  protected PlannerResult plan(DruidPlanner planner)
-  {
-    try {
-      return planner.plan();
-    }
-    catch (ValidationException e) {
-      throw new SqlPlanningException(e);
-    }
-  }
-
   /**
    * Return the datasource and table resources for this
    * statement.
@@ -188,7 +173,7 @@ public abstract class AbstractStatement implements Closeable
     return fullResourceActions;
   }
 
-  public SqlQueryPlus sqlRequest()
+  public SqlQueryPlus query()
   {
     return queryPlus;
   }
@@ -220,4 +205,17 @@ public abstract class AbstractStatement implements 
Closeable
   public void closeQuietly()
   {
   }
+
+  /**
+   * Convenience method to close the statement and report an error
+   * associated with the statement. Same as calling:{@code
+   * stmt.reporter().failed(e);
+   * stmt.close();
+   * }
+   */
+  public void closeWithError(Throwable e)
+  {
+    reporter.failed(e);
+    close();
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java 
b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index a8931bf73d..03318ed802 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -19,15 +19,20 @@
 
 package org.apache.druid.sql;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
 import org.apache.druid.sql.calcite.planner.DruidPlanner;
 import org.apache.druid.sql.calcite.planner.PlannerResult;
 import org.apache.druid.sql.calcite.planner.PrepareResult;
 
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -62,9 +67,91 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
 {
   private static final Logger log = new Logger(DirectStatement.class);
 
+  /**
+   * Represents the execution plan for a query with the ability to run
+   * that plan (once).
+   */
+  public class ResultSet implements Cancelable
+  {
+    private final PlannerResult plannerResult;
+
+    public ResultSet(PlannerResult plannerResult)
+    {
+      this.plannerResult = plannerResult;
+    }
+
+    public SqlQueryPlus query()
+    {
+      return queryPlus;
+    }
+
+    /**
+     * Convenience method for the split plan/run case to ensure that the 
statement
+     * can, in fact, be run.
+     */
+    public boolean runnable()
+    {
+      return plannerResult != null && plannerResult.runnable();
+    }
+
+    /**
+     * Do the actual execute step which allows subclasses to wrap the sequence,
+     * as is sometimes needed for testing.
+     */
+    public Sequence<Object[]> run()
+    {
+      try {
+        // Check cancellation. Required for SqlResourceTest to work.
+        transition(State.RAN);
+        return plannerResult.run();
+      }
+      catch (RuntimeException e) {
+        reporter.failed(e);
+        throw e;
+      }
+    }
+
+    public SqlRowTransformer createRowTransformer()
+    {
+      return new SqlRowTransformer(plannerContext.getTimeZone(), 
plannerResult.rowType());
+    }
+
+    public SqlExecutionReporter reporter()
+    {
+      return reporter;
+    }
+
+    @Override
+    public Set<ResourceAction> resources()
+    {
+      return DirectStatement.this.resources();
+    }
+
+    @Override
+    public void cancel()
+    {
+      DirectStatement.this.cancel();
+    }
+
+    public void close()
+    {
+      DirectStatement.this.close();
+    }
+  }
+
+  private enum State
+  {
+    START,
+    PREPARED,
+    RAN,
+    CANCELLED,
+    FAILED,
+    CLOSED
+  }
+
   protected PrepareResult prepareResult;
-  protected PlannerResult plannerResult;
-  private volatile boolean canceled;
+  protected ResultSet resultSet;
+  private volatile State state = State.START;
 
   public DirectStatement(
       final SqlToolbox lifecycleToolbox,
@@ -84,7 +171,21 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
   }
 
   /**
-   * Direct execution of a query, including:
+   * Convenience method to perform Direct execution of a query. Does both
+   * the {@link #plan()} step and the {@link ResultSet#run()} step.
+   *
+   * @return sequence which delivers query results
+   */
+  public Sequence<Object[]> execute()
+  {
+    return plan().run();
+  }
+
+  /**
+   * Prepares and plans a query for execution, returning a result set to
+   * execute the query. In Druid, prepare and plan are different: prepare 
provides
+   * information about the query, but plan does the "real" preparation to 
create
+   * an actual executable plan.
    * <ul>
    * <li>Create the planner.</li>
    * <li>Parse the statement.</li>
@@ -93,16 +194,14 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
    * <li>Validate the query against the Druid catalog.</li>
    * <li>Authorize access to the resources which the query needs.</li>
    * <li>Plan the query.</li>
-   * <li>Return a {@link Sequence} which executes the query and returns 
results.</li>
    * </ul>
-   *
-   * This method is called from the request thread; results are read in the
-   * response thread.
-   *
-   * @return sequence which delivers query results
+   * Call {@link ResultSet#run()} to run the resulting plan.
    */
-  public Sequence<Object[]> execute()
+  public ResultSet plan()
   {
+    if (state != State.START) {
+      throw new ISE("Can plan a query only once.");
+    }
     try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner(
         sqlToolbox.engine,
         queryPlus.sql(),
@@ -114,49 +213,59 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
       // Tests cancel during this call; real clients might do so if the plan
       // or execution prep stages take too long for some unexpected reason.
       sqlToolbox.sqlLifecycleManager.add(sqlQueryId(), this);
-      checkCanceled();
-      plannerResult = plan(planner);
+      transition(State.PREPARED);
+      resultSet = createResultSet(createPlan(planner));
       prepareResult = planner.prepareResult();
-      return doExecute();
+      // Double check needed by SqlResourceTest
+      transition(State.PREPARED);
+      return resultSet;
     }
     catch (RuntimeException e) {
+      state = State.FAILED;
       reporter.failed(e);
       throw e;
     }
   }
 
-  public PrepareResult prepareResult()
-  {
-    return prepareResult;
-  }
-
   /**
-   * Do the actual execute step which allows subclasses to wrap the sequence,
-   * as is sometimes needed for testing.
+   * Plan the query, which also produces the sequence that runs
+   * the query.
    */
-  protected Sequence<Object[]> doExecute()
+  @VisibleForTesting
+  protected PlannerResult createPlan(DruidPlanner planner)
   {
-    // Check cancellation here and not in execute() above:
-    // required for SqlResourceTest to work.
-    checkCanceled();
     try {
-      return plannerResult.run();
+      return planner.plan();
     }
-    catch (RuntimeException e) {
-      reporter.failed(e);
-      throw e;
+    catch (ValidationException e) {
+      throw new SqlPlanningException(e);
     }
   }
 
+  /**
+   * Wrapper around result set creation for the sole purpose of tests which
+   * inject failures.
+   */
+  @VisibleForTesting
+  protected ResultSet createResultSet(PlannerResult plannerResult)
+  {
+    return new ResultSet(plannerResult);
+  }
+
+  public PrepareResult prepareResult()
+  {
+    return prepareResult;
+  }
+
   /**
    * Checks for cancellation. As it turns out, this is really just a test-time
    * check: an actual client can't cancel the query until the query reports
    * a query ID, which won't happen until after the {@link #execute())}
    * call.
    */
-  private void checkCanceled()
+  private void transition(State newState)
   {
-    if (canceled) {
+    if (state == State.CANCELLED) {
       throw new QueryInterruptedException(
           QueryInterruptedException.QUERY_CANCELED,
           StringUtils.format("Query is canceled [%s]", sqlQueryId()),
@@ -164,12 +273,16 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
           null
       );
     }
+    state = newState;
   }
 
   @Override
   public void cancel()
   {
-    canceled = true;
+    if (state == State.CLOSED) {
+      return;
+    }
+    state = State.CANCELLED;
     final CopyOnWriteArrayList<String> nativeQueryIds = 
plannerContext.getNativeQueryIds();
 
     for (String nativeQueryId : nativeQueryIds) {
@@ -177,4 +290,22 @@ public class DirectStatement extends AbstractStatement 
implements Cancelable
       sqlToolbox.queryScheduler.cancelQuery(nativeQueryId);
     }
   }
+
+  @Override
+  public void close()
+  {
+    if (state != State.START && state != State.CLOSED) {
+      super.close();
+      state = State.CLOSED;
+    }
+  }
+
+  @Override
+  public void closeWithError(Throwable e)
+  {
+    if (state != State.START && state != State.CLOSED) {
+      super.closeWithError(e);
+      state = State.CLOSED;
+    }
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java 
b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
index 31abd15a42..52bef0a04f 100644
--- a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
@@ -68,9 +68,4 @@ public class HttpStatement extends DirectStatement
           sqlToolbox.plannerFactory.getAuthorizerMapper()
     );
   }
-
-  public SqlRowTransformer createRowTransformer()
-  {
-    return new SqlRowTransformer(plannerContext.getTimeZone(), 
plannerResult.rowType());
-  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java 
b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
index a08ad53674..fb4e4f439b 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.druid.query.BadQueryException;
@@ -67,6 +68,11 @@ public class SqlPlanningException extends BadQueryException
     this(PlanningError.VALIDATION_ERROR, e.getMessage());
   }
 
+  public SqlPlanningException(CalciteContextException e)
+  {
+    this(PlanningError.VALIDATION_ERROR, e.getMessage());
+  }
+
   public SqlPlanningException(PlanningError planningError, String errorMessage)
   {
     this(planningError.errorCode, errorMessage, planningError.errorClass);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
index 428dcbf6ef..3cd608addb 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -65,7 +65,7 @@ public class DruidJdbcPreparedStatement extends 
AbstractDruidJdbcStatement
       PrepareResult prepareResult = sqlStatement.prepare();
       signature = createSignature(
           prepareResult,
-          sqlStatement.sqlRequest().sql()
+          sqlStatement.query().sql()
       );
       state = State.PREPARED;
     }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 15a5e36770..36a69dd815 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -114,7 +114,7 @@ public class DruidJdbcResultSet implements Closeable
       yielder = Yielders.each(retSequence);
       signature = AbstractDruidJdbcStatement.createSignature(
           stmt.prepareResult(),
-          stmt.sqlRequest().sql()
+          stmt.query().sql()
       );
     }
     catch (ExecutionException e) {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
index c571c42ab0..619f6c5509 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.planner;
 
 import com.google.common.base.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Sequence;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +45,11 @@ public class PlannerResult
     this.rowType = rowType;
   }
 
+  public boolean runnable()
+  {
+    return !didRun.get();
+  }
+
   /**
    * Run the query
    */
@@ -51,7 +57,7 @@ public class PlannerResult
   {
     if (!didRun.compareAndSet(false, true)) {
       // Safety check.
-      throw new IllegalStateException("Cannot run more than once");
+      throw new ISE("Cannot run more than once");
     }
     return resultsSupplier.get();
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 6a3787b7ea..28623a622e 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizationUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.DirectStatement.ResultSet;
 import org.apache.druid.sql.HttpStatement;
 import org.apache.druid.sql.SqlExecutionReporter;
 import org.apache.druid.sql.SqlLifecycleManager;
@@ -67,6 +68,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -135,8 +137,9 @@ public class SqlResource
 
     try {
       Thread.currentThread().setName(StringUtils.format("sql[%s]", 
sqlQueryId));
-      final Sequence<Object[]> sequence = stmt.execute();
-      final SqlRowTransformer rowTransformer = stmt.createRowTransformer();
+      ResultSet resultSet = stmt.plan();
+      final Sequence<Object[]> sequence = resultSet.run();
+      final SqlRowTransformer rowTransformer = 
resultSet.createRowTransformer();
       final Yielder<Object[]> yielder0 = Yielders.each(sequence);
 
       try {
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java 
b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
index 41f9129032..2004fe9988 100644
--- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
@@ -26,6 +26,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -45,6 +46,7 @@ import 
org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.DirectStatement.ResultSet;
 import org.apache.druid.sql.SqlPlanningException.PlanningError;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@@ -68,6 +70,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.servlet.http.HttpServletRequest;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -75,6 +78,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SqlStatementTest
@@ -213,10 +219,53 @@ public class SqlStatementTest
         "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
         CalciteTests.REGULAR_USER_AUTH_RESULT);
     DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
-    List<Object[]> results = stmt.execute().toList();
+    ResultSet resultSet = stmt.plan();
+    assertTrue(resultSet.runnable());
+    List<Object[]> results = resultSet.run().toList();
     assertEquals(1, results.size());
     assertEquals(6L, results.get(0)[0]);
     assertEquals("foo", results.get(0)[1]);
+    assertSame(stmt.reporter(), resultSet.reporter());
+    assertSame(stmt.resources(), resultSet.resources());
+    assertSame(stmt.query(), resultSet.query());
+    assertFalse(resultSet.runnable());
+    resultSet.close();
+    stmt.close();
+  }
+
+  @Test
+  public void testDirectPlanTwice()
+  {
+    SqlQueryPlus sqlReq = queryPlus(
+        "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
+        CalciteTests.REGULAR_USER_AUTH_RESULT);
+    DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
+    stmt.plan();
+    try {
+      stmt.plan();
+      fail();
+    }
+    catch (ISE e) {
+      stmt.closeWithError(e);
+    }
+  }
+
+  @Test
+  public void testDirectExecTwice()
+  {
+    SqlQueryPlus sqlReq = queryPlus(
+        "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
+        CalciteTests.REGULAR_USER_AUTH_RESULT);
+    DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
+    ResultSet resultSet = stmt.plan();
+    resultSet.run();
+    try {
+      resultSet.run();
+      fail();
+    }
+    catch (ISE e) {
+      stmt.closeWithError(e);
+    }
   }
 
   @Test
@@ -352,7 +401,7 @@ public class SqlStatementTest
   // Prepared statements: using a prepare/execute model.
 
   @Test
-  public void testJdbcHappyPath()
+  public void testPreparedHappyPath()
   {
     SqlQueryPlus sqlReq = queryPlus(
         "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
@@ -381,7 +430,7 @@ public class SqlStatementTest
   }
 
   @Test
-  public void testJdbcSyntaxError()
+  public void testPrepareSyntaxError()
   {
     SqlQueryPlus sqlReq = queryPlus(
         "SELECT COUNT(*) AS cnt, 'foo' AS",
@@ -398,7 +447,7 @@ public class SqlStatementTest
   }
 
   @Test
-  public void testJdbcValidationError()
+  public void testPrepareValidationError()
   {
     SqlQueryPlus sqlReq = queryPlus(
         "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus",
@@ -415,7 +464,7 @@ public class SqlStatementTest
   }
 
   @Test
-  public void testJdbcPermissionError()
+  public void testPreparePermissionError()
   {
     SqlQueryPlus sqlReq = queryPlus(
         "select count(*) from forbiddenDatasource",
@@ -442,7 +491,7 @@ public class SqlStatementTest
         .auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
         .build();
     DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
-    Map<String, Object> context = 
stmt.sqlRequest().context().getMergedParams();
+    Map<String, Object> context = stmt.query().context().getMergedParams();
     Assert.assertEquals(2, context.size());
     // should contain only query id, not bySegment since it is not valid for 
SQL
     Assert.assertTrue(context.containsKey(PlannerContext.CTX_SQL_QUERY_ID));
@@ -457,7 +506,7 @@ public class SqlStatementTest
         .auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
         .build();
     DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
-    Map<String, Object> context = 
stmt.sqlRequest().context().getMergedParams();
+    Map<String, Object> context = stmt.query().context().getMergedParams();
     Assert.assertEquals(2, context.size());
     // Statement should contain default query context values
     for (String defaultContextKey : defaultQueryConfig.getContext().keySet()) {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
index 39029fa9a4..99caee8125 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
@@ -88,6 +88,9 @@ import java.util.stream.Collectors;
 
 public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
 {
+  // Timeout to allow (rapid) debugging, while not blocking tests with errors.
+  private static final int WAIT_TIMEOUT_SECS = 60;
+
   private SpecificSegmentsQuerySegmentWalker walker;
   private TestServerInventoryView serverView;
   private List<ImmutableDruidServer> druidServers;
@@ -931,11 +934,11 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     segmentDataSourceNames.add("foo");
     joinableDataSourceNames.add("foo");
     serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
-    Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for build twice
-    Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for get again, just to make sure table has been updated (latch 
counts down just before tables are updated)
-    Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
 
     fooTable = schema.getDatasource("foo");
     Assert.assertNotNull(fooTable);
@@ -952,11 +955,11 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     segmentDataSourceNames.remove("foo");
     serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
 
-    Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for build
-    Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for get again, just to make sure table has been updated (latch 
counts down just before tables are updated)
-    Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
 
     fooTable = schema.getDatasource("foo");
     Assert.assertNotNull(fooTable);
@@ -996,10 +999,10 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     segmentDataSourceNames.add("foo");
     serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
 
-    Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
-    Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
+    Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for get again, just to make sure table has been updated (latch 
counts down just before tables are updated)
-    Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
 
     fooTable = schema.getDatasource("foo");
     Assert.assertNotNull(fooTable);
@@ -1017,11 +1020,11 @@ public class SegmentMetadataCacheTest extends 
SegmentMetadataCacheCommon
     segmentDataSourceNames.remove("foo");
     serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
 
-    Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for build
-    Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
     // wait for get again, just to make sure table has been updated (latch 
counts down just before tables are updated)
-    Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+    Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS));
 
     fooTable = schema.getDatasource("foo");
     Assert.assertNotNull(fooTable);
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 6560562da2..7fe468a2e7 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -110,6 +110,7 @@ import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -133,7 +134,8 @@ public class SqlResourceTest extends CalciteTestBase
 {
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final String DUMMY_SQL_QUERY_ID = "dummy";
-  private static final int WAIT_TIMEOUT_SECS = 3;
+  // Timeout to allow (rapid) debugging, while not blocking tests with errors.
+  private static final int WAIT_TIMEOUT_SECS = 60;
   private static final Consumer<DirectStatement> NULL_ACTION = s -> {};
 
   private static final List<String> EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS =
@@ -1917,11 +1919,11 @@ public class SqlResourceTest extends CalciteTestBase
     }
 
     @Override
-    public PlannerResult plan(DruidPlanner planner)
+    public PlannerResult createPlan(DruidPlanner planner)
     {
       if (planLatchSupplier.get() != null) {
         if (planLatchSupplier.get().rhs) {
-          PlannerResult result = super.plan(planner);
+          PlannerResult result = super.createPlan(planner);
           planLatchSupplier.get().lhs.countDown();
           return result;
         } else {
@@ -1933,45 +1935,52 @@ public class SqlResourceTest extends CalciteTestBase
           catch (InterruptedException e) {
             throw new RuntimeException(e);
           }
-          return super.plan(planner);
+          return super.createPlan(planner);
         }
       } else {
-        return super.plan(planner);
+        return super.createPlan(planner);
       }
     }
 
     @Override
-    public Sequence<Object[]> execute()
+    public ResultSet plan()
     {
       onExecute.accept(this);
-      return super.execute();
+      return super.plan();
     }
 
     @Override
-    public Sequence<Object[]> doExecute()
+    public ResultSet createResultSet(PlannerResult plannerResult)
     {
-      final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
-          
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
-
-      if (executeLatchSupplier.get() != null) {
-        if (executeLatchSupplier.get().rhs) {
-          Sequence<Object[]> sequence = sequenceMapFn.apply(super.doExecute());
-          executeLatchSupplier.get().lhs.countDown();
-          return sequence;
-        } else {
-          try {
-            if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS)) {
-              throw new RuntimeException("Latch timed out");
+      return new ResultSet(plannerResult)
+      {
+        @Override
+        public Sequence<Object[]> run()
+        {
+          final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn 
=
+              
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
+
+          if (executeLatchSupplier.get() != null) {
+            if (executeLatchSupplier.get().rhs) {
+              Sequence<Object[]> sequence = sequenceMapFn.apply(super.run());
+              executeLatchSupplier.get().lhs.countDown();
+              return sequence;
+            } else {
+              try {
+                if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, 
TimeUnit.SECONDS)) {
+                  throw new RuntimeException("Latch timed out");
+                }
+              }
+              catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return sequenceMapFn.apply(super.run());
             }
+          } else {
+            return sequenceMapFn.apply(super.run());
           }
-          catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
-          return sequenceMapFn.apply(super.doExecute());
         }
-      } else {
-        return sequenceMapFn.apply(super.doExecute());
-      }
+      };
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to