paul-rogers commented on code in PR #12992:
URL: https://github.com/apache/druid/pull/12992#discussion_r973248371


##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();

Review Comment:
   More to the point: this is a test. There is nothing for the test to do while 
the query runs. So, this code should block so the test doesn't have to include 
the required boilerplate. There should, however, be a timeout in case bad 
things happen.



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new 
MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format(
+              "%s%s",
+              getIndexerURL(),
+              StringUtils.format("task/%s/reports", 
StringUtils.urlEncode(taskId))
+          )
+      );
+      return jsonMapper.readValue(response.getContent(), new 
TypeReference<Map<String, MSQTaskReport>>()
+      {
+      });
+    }
+    catch (ISE e) {

Review Comment:
   This can be `RuntimeException`: no need to wrap other runtime exceptions.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible 
exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link 
TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(

Review Comment:
   Would recommend against using this utility: it masks true errors. If the 
server is down, it will retry 240 times.
   
   Instead, do your own polling. A message timeout is an error here: we don't 
expect the server to be down. Any response other than the ones we expect is an 
error and need not be retried. Only the "not yet ready" response should be 
retried.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible 
exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link 
TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getTaskReportForMsqTask(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has 
produced some results that can be verified
+   */
+  private void compareResults(String taskId, MsqQueryWithResults 
expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", 
taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    Optional<String> resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (resultsComparison.isPresent()) {
+      throw new IAE(
+          "Expected query result is different from the actual result.\n"
+          + "Query: %s\n"
+          + "Actual Result: %s\n"
+          + "Expected Result: %s\n"
+          + "Mismatch Error: %s\n",
+          expectedQueryWithResults.getQuery(),
+          actualResults,
+          expectedQueryWithResults.getExpectedResults(),
+          resultsComparison.get()
+      );
+    }
+  }
+
+  /**
+   * Runs queries from files using MSQ and compares the results with the ones 
provided
+   */
+  @Override
+  public void testQueriesFromFile(String filePath, String fullDatasourcePath) 
throws Exception
+  {
+    LOG.info("Starting query tests for [%s]", filePath);

Review Comment:
   I believe we'll find that the file format here will be quite hard to 
maintain. Looks like it is JSON, which contains SQL, which contains JSON. While 
JSON is handy, this is a place where it does not serve us well.
   
   There is an open PR that proposes an "Impala-like" file format for query 
tests:
   
   ```text
   === sql
   SELECT ...
   === results
   {"a": 10, "b": "foo"}
   ...
   ```
   
   It is too much to ask to include that format here: the test PR is not yet 
merged. However, moving forward, perhaps we can retrofit that format here to 
make the query files a bit easier to use.
   
   In fact, we could use the entire framework: it already has mechanisms to 
compare expected and actual results, to loop through test cases, to run queries 
with differing context options, etc.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -19,13 +19,19 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class QueryResultVerifier
 {
-  public static boolean compareResults(
+  private static final Logger LOG = new Logger(QueryResultVerifier.class);
+
+  public static Optional<String> compareResults(

Review Comment:
   The approach taken in the planner test framework is to define a "test case" 
that has the query and expected results. Run that to get the actual results. 
Compare the two. The idea is to focus on the test and make the test code as 
simple as possible. Here:
   
   ```java
   testCase = loadTestCase(...);
   result = testCase.run(...);
   ```
   
   Where `run()` would verify the results and would fail if the results don't 
match. (Results could be an expected error, or expected rows.)
   



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new 
MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)
+  {
+    try {
+      StatusResponseHolder response = makeRequest(
+          HttpMethod.GET,
+          StringUtils.format(
+              "%s%s",
+              getIndexerURL(),
+              StringUtils.format("task/%s/reports", 
StringUtils.urlEncode(taskId))
+          )
+      );

Review Comment:
   It is a real nuisance that, with the dozens of places that the "format JSON 
message, send it, and convert the response" code is used, we don't have a 
reusable class. In one of the branches I started to create such a thing; let me 
see if I can find it and perhaps we can use it here to avoid redundancy.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {

Review Comment:
   Perhaps return the response, error or not. This lets us write tests that 
verify that something fails when it should. For example, if we want to verify 
permissions or that the engine rejects certain invalid syntax, etc.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java:
##########
@@ -146,24 +147,29 @@ private void testQueries(String url, 
List<QueryResultType> queries) throws Excep
     for (QueryResultType queryWithResult : queries) {
       LOG.info("Running Query %s", queryWithResult.getQuery());
       List<Map<String, Object>> result = queryClient.query(url, 
queryWithResult.getQuery());
-      if (!QueryResultVerifier.compareResults(result, 
queryWithResult.getExpectedResults(),
-                                              queryWithResult.getFieldsToTest()
-      )) {
+      Optional<String> resultsComparison = 
QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(),
+                                                                               
              queryWithResult.getFieldsToTest());
+      if (resultsComparison.isPresent()) {
         LOG.error(
             "Failed while executing query %s \n expectedResults: %s \n 
actualResults : %s",
             queryWithResult.getQuery(),
             
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
             jsonMapper.writeValueAsString(result)
         );
-        failed = true;
+        Assert.fail(StringUtils.format(
+            "Results mismatch while executing the query %s.\n"
+            + "Expected results: %s\n"
+            + "Actual results: %s\n"
+            + "Mismatch error: %s\n",
+            queryWithResult.getQuery(),
+            
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
+            jsonMapper.writeValueAsString(result),
+            resultsComparison.get()
+        ));

Review Comment:
   The above will be very hard to debug: the JSON payloads are large. This is 
already a problem in SQL tests when we compare queries: if they don't match, 
we're left with two big JSON blobs and it is tedious to determine where they 
differ.
   
   Results are a set of rows. So, it would be great to report, say, the number 
of the first row that differs, and display just those values. Then, display the 
actual results in the same format that they will appear in the "expected 
results" file. This will allow us to develop a new test by starting with empty 
results, then copy the failed test results into the file, then watch the test 
pass.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible 
exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link 
TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  {
+    return overlordClient.getTaskReportForMsqTask(taskId);
+  }
+
+  /**
+   * Compares the results for a given taskId. It is required that the task has 
produced some results that can be verified
+   */
+  private void compareResults(String taskId, MsqQueryWithResults 
expectedQueryWithResults)
+  {
+    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    if (taskReport == null) {
+      throw new ISE("Unable to fetch the status report for the task [%]", 
taskId);
+    }
+    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
+        taskReport.getPayload(),
+        "payload"
+    );
+    MSQResultsReport resultsReport = Preconditions.checkNotNull(
+        taskReportPayload.getResults(),
+        "Results report for the task id is empty"
+    );
+
+    List<Map<String, Object>> actualResults = new ArrayList<>();
+
+    Yielder<Object[]> yielder = resultsReport.getResultYielder();
+    RowSignature rowSignature = resultsReport.getSignature();
+
+    while (!yielder.isDone()) {
+      Object[] row = yielder.get();
+      Map<String, Object> rowWithFieldNames = new LinkedHashMap<>();
+      for (int i = 0; i < row.length; ++i) {
+        rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]);
+      }
+      actualResults.add(rowWithFieldNames);
+      yielder = yielder.next(null);
+    }
+
+    Optional<String> resultsComparison = QueryResultVerifier.compareResults(
+        actualResults,
+        expectedQueryWithResults.getExpectedResults(),
+        Collections.emptyList()
+    );
+    if (resultsComparison.isPresent()) {
+      throw new IAE(
+          "Expected query result is different from the actual result.\n"
+          + "Query: %s\n"
+          + "Actual Result: %s\n"
+          + "Expected Result: %s\n"
+          + "Mismatch Error: %s\n",

Review Comment:
   See not above about format.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible 
exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link 
TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)

Review Comment:
   There is only ever one report for MSQ, correct? I've never seen more than 
one.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.sql.SqlTaskStatus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Helper class to aid out ITs for MSQ.
+ * This takes all the clients required to make the necessary calls to the 
client APIs for MSQ and performs the boilerplate
+ * functions for the tests
+ */
+public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResults>
+{
+
+  private final ObjectMapper jsonMapper;
+  private final IntegrationTestingConfig config;
+  private final MsqOverlordResourceTestClient overlordClient;
+  private final SqlResourceTestClient msqClient;
+
+
+  @Inject
+  MsqTestQueryHelper(
+      final ObjectMapper jsonMapper,
+      final SqlResourceTestClient queryClient,
+      final IntegrationTestingConfig config,
+      final MsqOverlordResourceTestClient overlordClient,
+      final SqlResourceTestClient msqClient
+  )
+  {
+    super(jsonMapper, queryClient, config);
+    this.jsonMapper = jsonMapper;
+    this.config = config;
+    this.overlordClient = overlordClient;
+    this.msqClient = msqClient;
+  }
+
+  @Override
+  public String getQueryURL(String schemeAndHost)
+  {
+    return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost);
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and parameters
+   */
+  public String submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+  }
+
+  // Run the task, wait for it to complete, fetch the reports, verify the 
results,
+
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the task to be accepted by the cluster
+   * and returns the task id associated with the submitted task
+   */
+  public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, 
InterruptedException
+  {
+    String queryUrl = getQueryURL(config.getBrokerUrl());
+    Future<StatusResponseHolder> responseHolderFuture = 
msqClient.queryAsync(queryUrl, sqlQuery);
+    // It is okay to block here for the result because MSQ tasks return the 
task Id associated with it, which shouldn't
+    // consume a lot of time
+    StatusResponseHolder statusResponseHolder = responseHolderFuture.get();
+
+    // Check if the task has been accepted successfully
+    HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
+    if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new ISE(
+          "Unable to submit the task successfully. Received response status 
code [%d], and response content:\n[%s]",
+          httpResponseStatus,
+          statusResponseHolder.getContent()
+      );
+    }
+    String content = statusResponseHolder.getContent();
+    SqlTaskStatus sqlTaskStatus;
+    try {
+      sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Unable to parse the response");
+    }
+    if (sqlTaskStatus.getState().isFailure()) {
+      throw new ISE("Unable to start the task successfully.\nPossible 
exception: %s", sqlTaskStatus.getError());
+    }
+    return sqlTaskStatus.getTaskId();
+  }
+
+  /**
+   * The method retries till the task with taskId gets completed i.e. {@link 
TaskState#isComplete()}} returns true and
+   * returns the last fetched state {@link TaskState} of the task
+   */
+  public TaskState pollTaskIdForCompletion(String taskId) throws Exception
+  {
+    return RetryUtils.retry(
+        () -> {
+          TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId);
+          TaskState statusCode = taskStatusPlus.getStatusCode();
+          if (statusCode != null && statusCode.isComplete()) {
+            return taskStatusPlus.getStatusCode();
+          }
+          throw new TaskStillRunningException();
+        },
+        (Throwable t) -> t instanceof TaskStillRunningException,
+        100
+    );
+  }
+
+  /**
+   * Fetches status reports for a given task
+   */
+  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)

Review Comment:
   This would be a handy place to have a `MsqResult` object that holds the 
reports and pulls out the relevant bits as needed.



##########
integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query1.json:
##########
@@ -0,0 +1,31 @@
+[

Review Comment:
   The above file is empty. Intended?



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/QueryResultVerifier.java:
##########
@@ -19,13 +19,19 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class QueryResultVerifier
 {
-  public static boolean compareResults(
+  private static final Logger LOG = new Logger(QueryResultVerifier.class);
+
+  public static Optional<String> compareResults(

Review Comment:
   Since this is used in a test, we won't do anything with the results. The 
call will be either:
   
   ```java
   Assert.assertTrue(QueryResultVerifier.compareResults(...));
   ```
   
   Or just:
   
   ```java
   QueryResultVerifier.compareResults(...);
   ```
   
   Given the fact that this is a test, perhaps rename the method:
   
   ```java
   results = // run the query
   QueryResultVerifier.assertEqual(...);
   ```
   



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient

Review Comment:
   I wonder if these should simply be merged into the Overlord client. Or, be 
add-ons that take the overlord client as a parameter. Otherwise, we may find 
ourselves in need of Overlord Client X and Overlord Client Y, both as 
subclasses, and thus need two client when one would do.



##########
integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.clients.msq;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.OverlordResourceTestClient;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Overlord resource client for MSQ Tasks
+ */
+public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
+{
+  ObjectMapper jsonMapper;
+
+  @Inject
+  MsqOverlordResourceTestClient(
+      @Json ObjectMapper jsonMapper,
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    super(jsonMapper, httpClient, config);
+    this.jsonMapper = jsonMapper;
+    this.jsonMapper.registerModules(new 
MSQIndexingModule().getJacksonModules());
+  }
+
+  public Map<String, MSQTaskReport> getTaskReportForMsqTask(String taskId)

Review Comment:
   At present, MSQ is somewhat awkward when running a SELECT: one has to paw 
through the reports to get the results. One can safely predict that this will 
change later to have some direct way to get results. So, let's not bake the 
current report-based results too deeply into the tests.
   
   To anticipate this, perhaps the code here should have a method to return 
just results. Or, a `MsqResponse` object that holds reports and lets us pull 
out the error (if any) and the results. The results would ideally include 
schema and data (pulled from the various places in the reports.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to