This is an automated email from the ASF dual-hosted git repository.
rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new eb902375a2 Light refactor of the heavily refactored statement classes
(#12909)
eb902375a2 is described below
commit eb902375a233266229c6955cbe16833992579605
Author: Paul Rogers <[email protected]>
AuthorDate: Thu Aug 18 14:01:06 2022 -0700
Light refactor of the heavily refactored statement classes (#12909)
Reflects lessons learned from working with consumers of
the new code.
---
.../org/apache/druid/sql/AbstractStatement.java | 30 ++--
.../java/org/apache/druid/sql/DirectStatement.java | 193 +++++++++++++++++----
.../java/org/apache/druid/sql/HttpStatement.java | 5 -
.../org/apache/druid/sql/SqlPlanningException.java | 6 +
.../sql/avatica/DruidJdbcPreparedStatement.java | 2 +-
.../druid/sql/avatica/DruidJdbcResultSet.java | 2 +-
.../druid/sql/calcite/planner/PlannerResult.java | 8 +-
.../org/apache/druid/sql/http/SqlResource.java | 7 +-
.../org/apache/druid/sql/SqlStatementTest.java | 63 ++++++-
.../calcite/schema/SegmentMetadataCacheTest.java | 27 +--
.../org/apache/druid/sql/http/SqlResourceTest.java | 63 ++++---
11 files changed, 303 insertions(+), 103 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
index 2a001b7e50..99605918e2 100644
--- a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
@@ -30,7 +30,6 @@ import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.planner.PlannerResult;
import java.io.Closeable;
import java.util.Set;
@@ -160,20 +159,6 @@ public abstract class AbstractStatement implements
Closeable
);
}
- /**
- * Plan the query, which also produces the sequence that runs
- * the query.
- */
- protected PlannerResult plan(DruidPlanner planner)
- {
- try {
- return planner.plan();
- }
- catch (ValidationException e) {
- throw new SqlPlanningException(e);
- }
- }
-
/**
* Return the datasource and table resources for this
* statement.
@@ -188,7 +173,7 @@ public abstract class AbstractStatement implements Closeable
return fullResourceActions;
}
- public SqlQueryPlus sqlRequest()
+ public SqlQueryPlus query()
{
return queryPlus;
}
@@ -220,4 +205,17 @@ public abstract class AbstractStatement implements
Closeable
public void closeQuietly()
{
}
+
+ /**
+ * Convenience method to close the statement and report an error
+ * associated with the statement. Same as calling:{@code
+ * stmt.reporter().failed(e);
+ * stmt.close();
+ * }
+ */
+ public void closeWithError(Throwable e)
+ {
+ reporter.failed(e);
+ close();
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index a8931bf73d..03318ed802 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -19,15 +19,20 @@
package org.apache.druid.sql;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.planner.PrepareResult;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -62,9 +67,91 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
{
private static final Logger log = new Logger(DirectStatement.class);
+ /**
+ * Represents the execution plan for a query with the ability to run
+ * that plan (once).
+ */
+ public class ResultSet implements Cancelable
+ {
+ private final PlannerResult plannerResult;
+
+ public ResultSet(PlannerResult plannerResult)
+ {
+ this.plannerResult = plannerResult;
+ }
+
+ public SqlQueryPlus query()
+ {
+ return queryPlus;
+ }
+
+ /**
+ * Convenience method for the split plan/run case to ensure that the
statement
+ * can, in fact, be run.
+ */
+ public boolean runnable()
+ {
+ return plannerResult != null && plannerResult.runnable();
+ }
+
+ /**
+ * Do the actual execute step which allows subclasses to wrap the sequence,
+ * as is sometimes needed for testing.
+ */
+ public Sequence<Object[]> run()
+ {
+ try {
+ // Check cancellation. Required for SqlResourceTest to work.
+ transition(State.RAN);
+ return plannerResult.run();
+ }
+ catch (RuntimeException e) {
+ reporter.failed(e);
+ throw e;
+ }
+ }
+
+ public SqlRowTransformer createRowTransformer()
+ {
+ return new SqlRowTransformer(plannerContext.getTimeZone(),
plannerResult.rowType());
+ }
+
+ public SqlExecutionReporter reporter()
+ {
+ return reporter;
+ }
+
+ @Override
+ public Set<ResourceAction> resources()
+ {
+ return DirectStatement.this.resources();
+ }
+
+ @Override
+ public void cancel()
+ {
+ DirectStatement.this.cancel();
+ }
+
+ public void close()
+ {
+ DirectStatement.this.close();
+ }
+ }
+
+ private enum State
+ {
+ START,
+ PREPARED,
+ RAN,
+ CANCELLED,
+ FAILED,
+ CLOSED
+ }
+
protected PrepareResult prepareResult;
- protected PlannerResult plannerResult;
- private volatile boolean canceled;
+ protected ResultSet resultSet;
+ private volatile State state = State.START;
public DirectStatement(
final SqlToolbox lifecycleToolbox,
@@ -84,7 +171,21 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
}
/**
- * Direct execution of a query, including:
+ * Convenience method to perform Direct execution of a query. Does both
+ * the {@link #plan()} step and the {@link ResultSet#run()} step.
+ *
+ * @return sequence which delivers query results
+ */
+ public Sequence<Object[]> execute()
+ {
+ return plan().run();
+ }
+
+ /**
+ * Prepares and plans a query for execution, returning a result set to
+ * execute the query. In Druid, prepare and plan are different: prepare
provides
+ * information about the query, but plan does the "real" preparation to
create
+ * an actual executable plan.
* <ul>
* <li>Create the planner.</li>
* <li>Parse the statement.</li>
@@ -93,16 +194,14 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
* <li>Validate the query against the Druid catalog.</li>
* <li>Authorize access to the resources which the query needs.</li>
* <li>Plan the query.</li>
- * <li>Return a {@link Sequence} which executes the query and returns
results.</li>
* </ul>
- *
- * This method is called from the request thread; results are read in the
- * response thread.
- *
- * @return sequence which delivers query results
+ * Call {@link ResultSet#run()} to run the resulting plan.
*/
- public Sequence<Object[]> execute()
+ public ResultSet plan()
{
+ if (state != State.START) {
+ throw new ISE("Can plan a query only once.");
+ }
try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner(
sqlToolbox.engine,
queryPlus.sql(),
@@ -114,49 +213,59 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
// Tests cancel during this call; real clients might do so if the plan
// or execution prep stages take too long for some unexpected reason.
sqlToolbox.sqlLifecycleManager.add(sqlQueryId(), this);
- checkCanceled();
- plannerResult = plan(planner);
+ transition(State.PREPARED);
+ resultSet = createResultSet(createPlan(planner));
prepareResult = planner.prepareResult();
- return doExecute();
+ // Double check needed by SqlResourceTest
+ transition(State.PREPARED);
+ return resultSet;
}
catch (RuntimeException e) {
+ state = State.FAILED;
reporter.failed(e);
throw e;
}
}
- public PrepareResult prepareResult()
- {
- return prepareResult;
- }
-
/**
- * Do the actual execute step which allows subclasses to wrap the sequence,
- * as is sometimes needed for testing.
+ * Plan the query, which also produces the sequence that runs
+ * the query.
*/
- protected Sequence<Object[]> doExecute()
+ @VisibleForTesting
+ protected PlannerResult createPlan(DruidPlanner planner)
{
- // Check cancellation here and not in execute() above:
- // required for SqlResourceTest to work.
- checkCanceled();
try {
- return plannerResult.run();
+ return planner.plan();
}
- catch (RuntimeException e) {
- reporter.failed(e);
- throw e;
+ catch (ValidationException e) {
+ throw new SqlPlanningException(e);
}
}
+ /**
+ * Wrapper around result set creation for the sole purpose of tests which
+ * inject failures.
+ */
+ @VisibleForTesting
+ protected ResultSet createResultSet(PlannerResult plannerResult)
+ {
+ return new ResultSet(plannerResult);
+ }
+
+ public PrepareResult prepareResult()
+ {
+ return prepareResult;
+ }
+
/**
* Checks for cancellation. As it turns out, this is really just a test-time
* check: an actual client can't cancel the query until the query reports
* a query ID, which won't happen until after the {@link #execute())}
* call.
*/
- private void checkCanceled()
+ private void transition(State newState)
{
- if (canceled) {
+ if (state == State.CANCELLED) {
throw new QueryInterruptedException(
QueryInterruptedException.QUERY_CANCELED,
StringUtils.format("Query is canceled [%s]", sqlQueryId()),
@@ -164,12 +273,16 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
null
);
}
+ state = newState;
}
@Override
public void cancel()
{
- canceled = true;
+ if (state == State.CLOSED) {
+ return;
+ }
+ state = State.CANCELLED;
final CopyOnWriteArrayList<String> nativeQueryIds =
plannerContext.getNativeQueryIds();
for (String nativeQueryId : nativeQueryIds) {
@@ -177,4 +290,22 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
sqlToolbox.queryScheduler.cancelQuery(nativeQueryId);
}
}
+
+ @Override
+ public void close()
+ {
+ if (state != State.START && state != State.CLOSED) {
+ super.close();
+ state = State.CLOSED;
+ }
+ }
+
+ @Override
+ public void closeWithError(Throwable e)
+ {
+ if (state != State.START && state != State.CLOSED) {
+ super.closeWithError(e);
+ state = State.CLOSED;
+ }
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
index 31abd15a42..52bef0a04f 100644
--- a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java
@@ -68,9 +68,4 @@ public class HttpStatement extends DirectStatement
sqlToolbox.plannerFactory.getAuthorizerMapper()
);
}
-
- public SqlRowTransformer createRowTransformer()
- {
- return new SqlRowTransformer(plannerContext.getTimeZone(),
plannerResult.rowType());
- }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
index a08ad53674..fb4e4f439b 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.BadQueryException;
@@ -67,6 +68,11 @@ public class SqlPlanningException extends BadQueryException
this(PlanningError.VALIDATION_ERROR, e.getMessage());
}
+ public SqlPlanningException(CalciteContextException e)
+ {
+ this(PlanningError.VALIDATION_ERROR, e.getMessage());
+ }
+
public SqlPlanningException(PlanningError planningError, String errorMessage)
{
this(planningError.errorCode, errorMessage, planningError.errorClass);
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
index 428dcbf6ef..3cd608addb 100644
---
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -65,7 +65,7 @@ public class DruidJdbcPreparedStatement extends
AbstractDruidJdbcStatement
PrepareResult prepareResult = sqlStatement.prepare();
signature = createSignature(
prepareResult,
- sqlStatement.sqlRequest().sql()
+ sqlStatement.query().sql()
);
state = State.PREPARED;
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 15a5e36770..36a69dd815 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -114,7 +114,7 @@ public class DruidJdbcResultSet implements Closeable
yielder = Yielders.each(retSequence);
signature = AbstractDruidJdbcStatement.createSignature(
stmt.prepareResult(),
- stmt.sqlRequest().sql()
+ stmt.query().sql()
);
}
catch (ExecutionException e) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
index c571c42ab0..619f6c5509 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Supplier;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +45,11 @@ public class PlannerResult
this.rowType = rowType;
}
+ public boolean runnable()
+ {
+ return !didRun.get();
+ }
+
/**
* Run the query
*/
@@ -51,7 +57,7 @@ public class PlannerResult
{
if (!didRun.compareAndSet(false, true)) {
// Safety check.
- throw new IllegalStateException("Cannot run more than once");
+ throw new ISE("Cannot run more than once");
}
return resultsSupplier.get();
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 6a3787b7ea..28623a622e 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlExecutionReporter;
import org.apache.druid.sql.SqlLifecycleManager;
@@ -67,6 +68,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
+
import java.io.IOException;
import java.util.List;
import java.util.Set;
@@ -135,8 +137,9 @@ public class SqlResource
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]",
sqlQueryId));
- final Sequence<Object[]> sequence = stmt.execute();
- final SqlRowTransformer rowTransformer = stmt.createRowTransformer();
+ ResultSet resultSet = stmt.plan();
+ final Sequence<Object[]> sequence = resultSet.run();
+ final SqlRowTransformer rowTransformer =
resultSet.createRowTransformer();
final Yielder<Object[]> yielder0 = Yielders.each(sequence);
try {
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
index 41f9129032..2004fe9988 100644
--- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java
@@ -26,6 +26,7 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -45,6 +46,7 @@ import
org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.SqlPlanningException.PlanningError;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@@ -68,6 +70,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.servlet.http.HttpServletRequest;
+
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -75,6 +78,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SqlStatementTest
@@ -213,10 +219,53 @@ public class SqlStatementTest
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
- List<Object[]> results = stmt.execute().toList();
+ ResultSet resultSet = stmt.plan();
+ assertTrue(resultSet.runnable());
+ List<Object[]> results = resultSet.run().toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);
assertEquals("foo", results.get(0)[1]);
+ assertSame(stmt.reporter(), resultSet.reporter());
+ assertSame(stmt.resources(), resultSet.resources());
+ assertSame(stmt.query(), resultSet.query());
+ assertFalse(resultSet.runnable());
+ resultSet.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testDirectPlanTwice()
+ {
+ SqlQueryPlus sqlReq = queryPlus(
+ "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
+ CalciteTests.REGULAR_USER_AUTH_RESULT);
+ DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
+ stmt.plan();
+ try {
+ stmt.plan();
+ fail();
+ }
+ catch (ISE e) {
+ stmt.closeWithError(e);
+ }
+ }
+
+ @Test
+ public void testDirectExecTwice()
+ {
+ SqlQueryPlus sqlReq = queryPlus(
+ "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
+ CalciteTests.REGULAR_USER_AUTH_RESULT);
+ DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
+ ResultSet resultSet = stmt.plan();
+ resultSet.run();
+ try {
+ resultSet.run();
+ fail();
+ }
+ catch (ISE e) {
+ stmt.closeWithError(e);
+ }
}
@Test
@@ -352,7 +401,7 @@ public class SqlStatementTest
// Prepared statements: using a prepare/execute model.
@Test
- public void testJdbcHappyPath()
+ public void testPreparedHappyPath()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
@@ -381,7 +430,7 @@ public class SqlStatementTest
}
@Test
- public void testJdbcSyntaxError()
+ public void testPrepareSyntaxError()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS",
@@ -398,7 +447,7 @@ public class SqlStatementTest
}
@Test
- public void testJdbcValidationError()
+ public void testPrepareValidationError()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus",
@@ -415,7 +464,7 @@ public class SqlStatementTest
}
@Test
- public void testJdbcPermissionError()
+ public void testPreparePermissionError()
{
SqlQueryPlus sqlReq = queryPlus(
"select count(*) from forbiddenDatasource",
@@ -442,7 +491,7 @@ public class SqlStatementTest
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
- Map<String, Object> context =
stmt.sqlRequest().context().getMergedParams();
+ Map<String, Object> context = stmt.query().context().getMergedParams();
Assert.assertEquals(2, context.size());
// should contain only query id, not bySegment since it is not valid for
SQL
Assert.assertTrue(context.containsKey(PlannerContext.CTX_SQL_QUERY_ID));
@@ -457,7 +506,7 @@ public class SqlStatementTest
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
- Map<String, Object> context =
stmt.sqlRequest().context().getMergedParams();
+ Map<String, Object> context = stmt.query().context().getMergedParams();
Assert.assertEquals(2, context.size());
// Statement should contain default query context values
for (String defaultContextKey : defaultQueryConfig.getContext().keySet()) {
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
index 39029fa9a4..99caee8125 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java
@@ -88,6 +88,9 @@ import java.util.stream.Collectors;
public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
{
+ // Timeout to allow (rapid) debugging, while not blocking tests with errors.
+ private static final int WAIT_TIMEOUT_SECS = 60;
+
private SpecificSegmentsQuerySegmentWalker walker;
private TestServerInventoryView serverView;
private List<ImmutableDruidServer> druidServers;
@@ -931,11 +934,11 @@ public class SegmentMetadataCacheTest extends
SegmentMetadataCacheCommon
segmentDataSourceNames.add("foo");
joinableDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
- Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for build twice
- Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch
counts down just before tables are updated)
- Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@@ -952,11 +955,11 @@ public class SegmentMetadataCacheTest extends
SegmentMetadataCacheCommon
segmentDataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
- Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for build
- Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch
counts down just before tables are updated)
- Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@@ -996,10 +999,10 @@ public class SegmentMetadataCacheTest extends
SegmentMetadataCacheCommon
segmentDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
- Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
- Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
+ Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch
counts down just before tables are updated)
- Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@@ -1017,11 +1020,11 @@ public class SegmentMetadataCacheTest extends
SegmentMetadataCacheCommon
segmentDataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
- Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for build
- Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch
counts down just before tables are updated)
- Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
+ Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 6560562da2..7fe468a2e7 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -110,6 +110,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -133,7 +134,8 @@ public class SqlResourceTest extends CalciteTestBase
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DUMMY_SQL_QUERY_ID = "dummy";
- private static final int WAIT_TIMEOUT_SECS = 3;
+ // Timeout to allow (rapid) debugging, while not blocking tests with errors.
+ private static final int WAIT_TIMEOUT_SECS = 60;
private static final Consumer<DirectStatement> NULL_ACTION = s -> {};
private static final List<String> EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS =
@@ -1917,11 +1919,11 @@ public class SqlResourceTest extends CalciteTestBase
}
@Override
- public PlannerResult plan(DruidPlanner planner)
+ public PlannerResult createPlan(DruidPlanner planner)
{
if (planLatchSupplier.get() != null) {
if (planLatchSupplier.get().rhs) {
- PlannerResult result = super.plan(planner);
+ PlannerResult result = super.createPlan(planner);
planLatchSupplier.get().lhs.countDown();
return result;
} else {
@@ -1933,45 +1935,52 @@ public class SqlResourceTest extends CalciteTestBase
catch (InterruptedException e) {
throw new RuntimeException(e);
}
- return super.plan(planner);
+ return super.createPlan(planner);
}
} else {
- return super.plan(planner);
+ return super.createPlan(planner);
}
}
@Override
- public Sequence<Object[]> execute()
+ public ResultSet plan()
{
onExecute.accept(this);
- return super.execute();
+ return super.plan();
}
@Override
- public Sequence<Object[]> doExecute()
+ public ResultSet createResultSet(PlannerResult plannerResult)
{
- final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
-
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
-
- if (executeLatchSupplier.get() != null) {
- if (executeLatchSupplier.get().rhs) {
- Sequence<Object[]> sequence = sequenceMapFn.apply(super.doExecute());
- executeLatchSupplier.get().lhs.countDown();
- return sequence;
- } else {
- try {
- if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS)) {
- throw new RuntimeException("Latch timed out");
+ return new ResultSet(plannerResult)
+ {
+ @Override
+ public Sequence<Object[]> run()
+ {
+ final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn
=
+
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
+
+ if (executeLatchSupplier.get() != null) {
+ if (executeLatchSupplier.get().rhs) {
+ Sequence<Object[]> sequence = sequenceMapFn.apply(super.run());
+ executeLatchSupplier.get().lhs.countDown();
+ return sequence;
+ } else {
+ try {
+ if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS,
TimeUnit.SECONDS)) {
+ throw new RuntimeException("Latch timed out");
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return sequenceMapFn.apply(super.run());
}
+ } else {
+ return sequenceMapFn.apply(super.run());
}
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return sequenceMapFn.apply(super.doExecute());
}
- } else {
- return sequenceMapFn.apply(super.doExecute());
- }
+ };
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]