kfaraz commented on code in PR #12992: URL: https://github.com/apache/druid/pull/12992#discussion_r957105315
########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> +{ + + private final ObjectMapper jsonMapper; + private final IntegrationTestingConfig config; + private final MsqOverlordResourceTestClient overlordClient; + private final MsqTestClient msqClient; + + + @Inject + MsqTestQueryHelper( + final ObjectMapper jsonMapper, + final MsqTestClient queryClient, + final IntegrationTestingConfig config, + final MsqOverlordResourceTestClient overlordClient, + final MsqTestClient msqClient + ) + { + super(jsonMapper, queryClient, config); + this.jsonMapper = jsonMapper; + this.config = config; + this.overlordClient = overlordClient; + this.msqClient = msqClient; + } + + @Override + public String getQueryURL(String schemeAndHost) + { + return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and parameters + */ + public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + } + + // Run the task, wait for it to complete, fetch the reports, verify the results, + + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster + * and returns the task id associated with the submitted task + */ + public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException + { + String queryUrl = getQueryURL(config.getBrokerUrl()); + Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery); + // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't + // consume a lot of time + StatusResponseHolder statusResponseHolder = responseHolderFuture.get(); + + // Check if the task has been accepted successfully + HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus(); + if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) { + throw new ISE( + "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", + httpResponseStatus, + statusResponseHolder.getContent() + ); + } + String content = statusResponseHolder.getContent(); + SqlTaskStatus sqlTaskStatus; + try { + sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class); + } + catch (JsonProcessingException e) { + throw new ISE("Unable to parse the response"); + } + if (sqlTaskStatus.getState().isFailure()) { + throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError()); + } + return sqlTaskStatus.getTaskId(); + } + + /** + * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can + * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task + */ + public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds) + { + if (maxTimeoutSeconds < 0) { + throw new IAE("Timeout cannot be negative"); + } else if (maxTimeoutSeconds == 0) { + maxTimeoutSeconds = Long.MAX_VALUE; + } + long time = 0; + do { + TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId); + TaskState statusCode = taskStatusPlus.getStatusCode(); + if (statusCode != null && statusCode.isComplete()) { + return taskStatusPlus.getStatusCode(); + } + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId); + } + } while (time++ < maxTimeoutSeconds); + return overlordClient.getTaskStatus(taskId).getStatusCode(); + } + + /** + * Fetches status reports for a given task + */ + public Map<String, MSQTaskReport> fetchStatusReports(String taskId) + { + return overlordClient.getTaskReportForMsqTask(taskId); + } + + /** + * Compares the results for a given taskId. It is required that the task has produced some results that can be verified + */ + public void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults) Review Comment: This method should not be public. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQuery +{ + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + private MsqTestClient msqClient; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + private static final String QUERY_FILE = "/indexer/wikipedia_index_data1_query.json"; + + @Test + public void testMsqIngestionAndQuerying() throws Exception + { + String datasource = "dst"; + + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource); + + String queryLocal = + StringUtils.format( + "INSERT INTO %s\n" Review Comment: I suppose it would be cleaner to have the INSERT query in a separate resource file, similar to the SELECT query. ########## integration-tests/src/main/java/org/apache/druid/testing/clients/MsqOverlordResourceTestClient.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.clients; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.TestClient; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import java.util.Map; + +/** + * Overlord resource client for MSQ Tasks + */ +public class MsqOverlordResourceTestClient extends OverlordResourceTestClient Review Comment: This and other related classes should probably be in the `msq` package as they will only ever be used for MSQ ITs. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> Review Comment: Design wise, the contract of this class is not very clear. If it is just meant to be a query helper, it should not be submitting tasks and fetching reports. Maybe that functionality should be retained by the `MsqTestClient` itself. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> +{ + + private final ObjectMapper jsonMapper; + private final IntegrationTestingConfig config; + private final MsqOverlordResourceTestClient overlordClient; + private final MsqTestClient msqClient; + + + @Inject + MsqTestQueryHelper( + final ObjectMapper jsonMapper, + final MsqTestClient queryClient, + final IntegrationTestingConfig config, + final MsqOverlordResourceTestClient overlordClient, + final MsqTestClient msqClient + ) + { + super(jsonMapper, queryClient, config); + this.jsonMapper = jsonMapper; + this.config = config; + this.overlordClient = overlordClient; + this.msqClient = msqClient; + } + + @Override + public String getQueryURL(String schemeAndHost) + { + return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and parameters + */ + public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + } + + // Run the task, wait for it to complete, fetch the reports, verify the results, + + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster + * and returns the task id associated with the submitted task + */ + public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException + { + String queryUrl = getQueryURL(config.getBrokerUrl()); + Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery); + // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't + // consume a lot of time + StatusResponseHolder statusResponseHolder = responseHolderFuture.get(); + + // Check if the task has been accepted successfully + HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus(); + if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) { + throw new ISE( + "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", + httpResponseStatus, + statusResponseHolder.getContent() + ); + } + String content = statusResponseHolder.getContent(); + SqlTaskStatus sqlTaskStatus; + try { + sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class); + } + catch (JsonProcessingException e) { + throw new ISE("Unable to parse the response"); + } + if (sqlTaskStatus.getState().isFailure()) { + throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError()); + } + return sqlTaskStatus.getTaskId(); + } + + /** + * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can + * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task + */ + public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds) + { + if (maxTimeoutSeconds < 0) { + throw new IAE("Timeout cannot be negative"); + } else if (maxTimeoutSeconds == 0) { + maxTimeoutSeconds = Long.MAX_VALUE; + } + long time = 0; + do { + TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId); + TaskState statusCode = taskStatusPlus.getStatusCode(); + if (statusCode != null && statusCode.isComplete()) { + return taskStatusPlus.getStatusCode(); + } + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId); + } + } while (time++ < maxTimeoutSeconds); + return overlordClient.getTaskStatus(taskId).getStatusCode(); + } + + /** + * Fetches status reports for a given task + */ + public Map<String, MSQTaskReport> fetchStatusReports(String taskId) + { + return overlordClient.getTaskReportForMsqTask(taskId); + } + + /** + * Compares the results for a given taskId. It is required that the task has produced some results that can be verified + */ + public void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults) + { + Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId); + MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY); + if (taskReport == null) { + throw new ISE("Unable to fetch the status report for the task [%]", taskId); + } + MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull( + taskReport.getPayload(), + "payload" + ); + MSQResultsReport resultsReport = Preconditions.checkNotNull( + taskReportPayload.getResults(), + "Results report for the task id is empty" + ); + + List<Map<String, Object>> actualResults = new ArrayList<>(); + + Yielder<Object[]> yielder = resultsReport.getResultYielder(); + RowSignature rowSignature = resultsReport.getSignature(); + + while (!yielder.isDone()) { + Object[] row = yielder.get(); + Map<String, Object> rowWithFieldNames = new LinkedHashMap<>(); + for (int i = 0; i < row.length; ++i) { + rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]); + } + actualResults.add(rowWithFieldNames); + yielder = yielder.next(null); + } + + boolean resultsComparison = QueryResultVerifier.compareResults( + actualResults, + expectedQueryWithResults.getExpectedResults(), + Collections.emptyList() + ); + if (!resultsComparison) { + throw new IAE("Expected query result is different from the actual result"); Review Comment: Maybe `QueryResultVerifier` itself should do the assertions or atleast give back messages detailing the mismatch. The caller getting a boolean and then throwing an ISE makes for very difficult to debug tests. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> +{ + + private final ObjectMapper jsonMapper; + private final IntegrationTestingConfig config; + private final MsqOverlordResourceTestClient overlordClient; + private final MsqTestClient msqClient; + + + @Inject + MsqTestQueryHelper( + final ObjectMapper jsonMapper, + final MsqTestClient queryClient, + final IntegrationTestingConfig config, + final MsqOverlordResourceTestClient overlordClient, + final MsqTestClient msqClient + ) + { + super(jsonMapper, queryClient, config); + this.jsonMapper = jsonMapper; + this.config = config; + this.overlordClient = overlordClient; + this.msqClient = msqClient; + } + + @Override + public String getQueryURL(String schemeAndHost) + { + return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and parameters + */ + public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + } + + // Run the task, wait for it to complete, fetch the reports, verify the results, + + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster + * and returns the task id associated with the submitted task + */ + public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException + { + String queryUrl = getQueryURL(config.getBrokerUrl()); + Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery); + // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't + // consume a lot of time + StatusResponseHolder statusResponseHolder = responseHolderFuture.get(); + + // Check if the task has been accepted successfully + HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus(); + if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) { + throw new ISE( + "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", + httpResponseStatus, + statusResponseHolder.getContent() + ); + } + String content = statusResponseHolder.getContent(); + SqlTaskStatus sqlTaskStatus; + try { + sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class); + } + catch (JsonProcessingException e) { + throw new ISE("Unable to parse the response"); + } + if (sqlTaskStatus.getState().isFailure()) { + throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError()); + } + return sqlTaskStatus.getTaskId(); + } + + /** + * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can + * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task + */ + public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds) + { + if (maxTimeoutSeconds < 0) { + throw new IAE("Timeout cannot be negative"); + } else if (maxTimeoutSeconds == 0) { + maxTimeoutSeconds = Long.MAX_VALUE; + } + long time = 0; + do { + TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId); + TaskState statusCode = taskStatusPlus.getStatusCode(); + if (statusCode != null && statusCode.isComplete()) { + return taskStatusPlus.getStatusCode(); + } + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId); + } + } while (time++ < maxTimeoutSeconds); + return overlordClient.getTaskStatus(taskId).getStatusCode(); + } + + /** + * Fetches status reports for a given task + */ + public Map<String, MSQTaskReport> fetchStatusReports(String taskId) + { + return overlordClient.getTaskReportForMsqTask(taskId); + } + + /** + * Compares the results for a given taskId. It is required that the task has produced some results that can be verified + */ + public void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults) + { + Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId); + MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY); + if (taskReport == null) { + throw new ISE("Unable to fetch the status report for the task [%]", taskId); + } + MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull( + taskReport.getPayload(), + "payload" + ); + MSQResultsReport resultsReport = Preconditions.checkNotNull( + taskReportPayload.getResults(), + "Results report for the task id is empty" + ); + + List<Map<String, Object>> actualResults = new ArrayList<>(); + + Yielder<Object[]> yielder = resultsReport.getResultYielder(); + RowSignature rowSignature = resultsReport.getSignature(); + + while (!yielder.isDone()) { + Object[] row = yielder.get(); + Map<String, Object> rowWithFieldNames = new LinkedHashMap<>(); + for (int i = 0; i < row.length; ++i) { + rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]); + } + actualResults.add(rowWithFieldNames); + yielder = yielder.next(null); + } + + boolean resultsComparison = QueryResultVerifier.compareResults( + actualResults, + expectedQueryWithResults.getExpectedResults(), + Collections.emptyList() + ); + if (!resultsComparison) { + throw new IAE("Expected query result is different from the actual result"); + } + } + + /** + * Runs queries from files using MSQ and compares the results with the ones provided + */ + public void testQueriesFromFileUsingMsq(String filePath, String fullDatasourcePath) Review Comment: The `UsingMsq` suffix is probably redundant as this class is `MsqTestQueryHelper` and will only be used with MSQ. You need not worry about clashing with the method in the parent class, you can always override it. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITMultiStageQuery +{ + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + private MsqTestClient msqClient; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + private static final String QUERY_FILE = "/indexer/wikipedia_index_data1_query.json"; Review Comment: I think we should use more specific names for the resource files that we add. Over time, these file names become difficult to tell apart and we often end up duplicating the content. You could either name them based on the tests they are used in or the function they perform (e.g. select_orderby_xyz_query.json). It would also be nice if they were kept in folders that correspondeded to the test package. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> +{ + + private final ObjectMapper jsonMapper; + private final IntegrationTestingConfig config; + private final MsqOverlordResourceTestClient overlordClient; + private final MsqTestClient msqClient; + + + @Inject + MsqTestQueryHelper( + final ObjectMapper jsonMapper, + final MsqTestClient queryClient, + final IntegrationTestingConfig config, + final MsqOverlordResourceTestClient overlordClient, + final MsqTestClient msqClient + ) + { + super(jsonMapper, queryClient, config); + this.jsonMapper = jsonMapper; + this.config = config; + this.overlordClient = overlordClient; + this.msqClient = msqClient; + } + + @Override + public String getQueryURL(String schemeAndHost) + { + return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and parameters + */ + public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + } + + // Run the task, wait for it to complete, fetch the reports, verify the results, + + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster + * and returns the task id associated with the submitted task + */ + public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException + { + String queryUrl = getQueryURL(config.getBrokerUrl()); + Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery); + // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't + // consume a lot of time + StatusResponseHolder statusResponseHolder = responseHolderFuture.get(); + + // Check if the task has been accepted successfully + HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus(); + if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) { + throw new ISE( + "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", + httpResponseStatus, + statusResponseHolder.getContent() + ); + } + String content = statusResponseHolder.getContent(); + SqlTaskStatus sqlTaskStatus; + try { + sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class); + } + catch (JsonProcessingException e) { + throw new ISE("Unable to parse the response"); + } + if (sqlTaskStatus.getState().isFailure()) { + throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError()); + } + return sqlTaskStatus.getTaskId(); + } + + /** + * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can + * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task + */ + public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds) + { + if (maxTimeoutSeconds < 0) { + throw new IAE("Timeout cannot be negative"); + } else if (maxTimeoutSeconds == 0) { + maxTimeoutSeconds = Long.MAX_VALUE; + } + long time = 0; + do { + TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId); + TaskState statusCode = taskStatusPlus.getStatusCode(); + if (statusCode != null && statusCode.isComplete()) { + return taskStatusPlus.getStatusCode(); + } + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId); + } + } while (time++ < maxTimeoutSeconds); + return overlordClient.getTaskStatus(taskId).getStatusCode(); + } + + /** + * Fetches status reports for a given task + */ + public Map<String, MSQTaskReport> fetchStatusReports(String taskId) + { + return overlordClient.getTaskReportForMsqTask(taskId); + } + + /** + * Compares the results for a given taskId. It is required that the task has produced some results that can be verified + */ + public void compareResults(String taskId, MsqQueryWithResults expectedQueryWithResults) + { + Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId); + MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY); + if (taskReport == null) { + throw new ISE("Unable to fetch the status report for the task [%]", taskId); + } + MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull( + taskReport.getPayload(), + "payload" + ); + MSQResultsReport resultsReport = Preconditions.checkNotNull( + taskReportPayload.getResults(), + "Results report for the task id is empty" + ); + + List<Map<String, Object>> actualResults = new ArrayList<>(); + + Yielder<Object[]> yielder = resultsReport.getResultYielder(); + RowSignature rowSignature = resultsReport.getSignature(); + + while (!yielder.isDone()) { + Object[] row = yielder.get(); + Map<String, Object> rowWithFieldNames = new LinkedHashMap<>(); + for (int i = 0; i < row.length; ++i) { + rowWithFieldNames.put(rowSignature.getColumnName(i), row[i]); + } + actualResults.add(rowWithFieldNames); + yielder = yielder.next(null); + } + + boolean resultsComparison = QueryResultVerifier.compareResults( + actualResults, + expectedQueryWithResults.getExpectedResults(), + Collections.emptyList() + ); + if (!resultsComparison) { + throw new IAE("Expected query result is different from the actual result"); Review Comment: I know that `AbstractTestQueryHelper` also does this in a similar way but we should avoid this if we want to write cleaner tests. Assertions should preferably be done using the appropriate JUnit/TestNG constructs. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.MsqOverlordResourceTestClient; +import org.apache.druid.testing.clients.MsqTestClient; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Helper class to aid out ITs for MSQ + */ +public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> +{ + + private final ObjectMapper jsonMapper; + private final IntegrationTestingConfig config; + private final MsqOverlordResourceTestClient overlordClient; + private final MsqTestClient msqClient; + + + @Inject + MsqTestQueryHelper( + final ObjectMapper jsonMapper, + final MsqTestClient queryClient, + final IntegrationTestingConfig config, + final MsqOverlordResourceTestClient overlordClient, + final MsqTestClient msqClient + ) + { + super(jsonMapper, queryClient, config); + this.jsonMapper = jsonMapper; + this.config = config; + this.overlordClient = overlordClient; + this.msqClient = msqClient; + } + + @Override + public String getQueryURL(String schemeAndHost) + { + return StringUtils.format("%s/druid/v2/sql/task", schemeAndHost); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and parameters + */ + public String submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + } + + // Run the task, wait for it to complete, fetch the reports, verify the results, + + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster + * and returns the task id associated with the submitted task + */ + public String submitMsqTask(SqlQuery sqlQuery) throws ExecutionException, InterruptedException + { + String queryUrl = getQueryURL(config.getBrokerUrl()); + Future<StatusResponseHolder> responseHolderFuture = msqClient.queryAsync(queryUrl, sqlQuery); + // It is okay to block here for the result because MSQ tasks return the task Id associated with it, which shouldn't + // consume a lot of time + StatusResponseHolder statusResponseHolder = responseHolderFuture.get(); + + // Check if the task has been accepted successfully + HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus(); + if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) { + throw new ISE( + "Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", + httpResponseStatus, + statusResponseHolder.getContent() + ); + } + String content = statusResponseHolder.getContent(); + SqlTaskStatus sqlTaskStatus; + try { + sqlTaskStatus = jsonMapper.readValue(content, SqlTaskStatus.class); + } + catch (JsonProcessingException e) { + throw new ISE("Unable to parse the response"); + } + if (sqlTaskStatus.getState().isFailure()) { + throw new ISE("Unable to start the task successfully.\nPossible exception: %s", sqlTaskStatus.getError()); + } + return sqlTaskStatus.getTaskId(); + } + + /** + * Polls the overlord API every 1 second and waits for a submitted MSQ task to be completed. Alternatively, one can + * specify the maximum time to poll. The method returns the last fetched {@link TaskState} of the task + */ + public TaskState pollTaskIdForCompletion(String taskId, long maxTimeoutSeconds) + { + if (maxTimeoutSeconds < 0) { + throw new IAE("Timeout cannot be negative"); + } else if (maxTimeoutSeconds == 0) { + maxTimeoutSeconds = Long.MAX_VALUE; + } + long time = 0; + do { + TaskStatusPlus taskStatusPlus = overlordClient.getTaskStatus(taskId); + TaskState statusCode = taskStatusPlus.getStatusCode(); + if (statusCode != null && statusCode.isComplete()) { + return taskStatusPlus.getStatusCode(); + } + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted while polling for task [%s] completion", taskId); + } + } while (time++ < maxTimeoutSeconds); Review Comment: This is a very weird and error-prone way of enforcing the timeout. The `RetryUtils` are probably already doing this in a cleaner way. Please try to reuse the approach used in the other ITs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
