This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.20 in repository https://gitbox.apache.org/repos/asf/drill.git
commit 3b949bc31ae0b0ece2cf9b8e00e7862a95efb88e Author: James Turton <[email protected]> AuthorDate: Sun Sep 18 14:54:14 2022 +0800 DRILL-8307: Ensure thread safety in the Druid plugin HTTP client (#2650) --- .../exec/store/druid/rest/DruidAdminClient.java | 30 ++++---- .../exec/store/druid/rest/DruidQueryClient.java | 32 ++++---- .../drill/exec/store/druid/rest/RestClient.java | 20 ++++- .../exec/store/druid/rest/RestClientWrapper.java | 48 ++++++------ .../drill/exec/store/druid/DruidTestBase.java | 4 +- .../{DruidTestSuit.java => DruidTestSuite.java} | 4 +- .../drill/exec/store/druid/TestDataGenerator.java | 85 +++++++++++----------- .../store/druid/rest/DruidQueryClientTest.java | 37 ++++------ 8 files changed, 135 insertions(+), 125 deletions(-) diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java index bbfd336a42..09c99fc834 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java @@ -21,12 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import okhttp3.Response; + +import java.io.InputStream; import java.io.IOException; import java.util.List; @@ -34,7 +34,6 @@ public class DruidAdminClient { private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class); private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple"; - private static final String DEFAULT_ENCODING = "UTF-8"; private static final ObjectMapper mapper = new ObjectMapper(); private final String coordinatorAddress; @@ -47,18 +46,19 @@ public class DruidAdminClient { public List<SimpleDatasourceInfo> getDataSources() throws IOException { String url = this.coordinatorAddress + DATASOURCES_BASE_URI; - HttpResponse response = restClient.get(url); + try (Response response = restClient.get(url)) { + if (!response.isSuccessful()) { + // TODO: Add a CustomErrorContext when this plugin is converted to EVF. + throw UserException + .dataReadError() + .message("Error getting druid datasources. HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .build(logger); + } - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw UserException - .dataReadError() - .message("Error getting druid datasources. HTTP request failed") - .addContext("Response code", response.getStatusLine().getStatusCode()) - .addContext("Response message", response.getStatusLine().getReasonPhrase()) - .build(logger); + InputStream responseStream = response.body().byteStream(); + return mapper.readValue(responseStream, new TypeReference<List<SimpleDatasourceInfo>>(){}); } - - String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - return mapper.readValue(responseJson, new TypeReference<List<SimpleDatasourceInfo>>(){}); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java index bd05c3ee5d..fe82650199 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java @@ -22,12 +22,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import okhttp3.Response; + +import java.io.InputStream; import java.util.ArrayList; public class DruidQueryClient { @@ -48,20 +48,22 @@ public class DruidQueryClient { public DruidScanResponse executeQuery(String query) throws Exception { logger.debug("Executing Query - {}", query); - HttpResponse response = restClient.post(queryUrl, query); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw UserException - .dataReadError() - .message("Error executing druid query. HTTP request failed") - .addContext("Response code", response.getStatusLine().getStatusCode()) - .addContext("Response message", response.getStatusLine().getReasonPhrase()) - .build(logger); - } + try (Response response = restClient.post(queryUrl, query)) { + if (!response.isSuccessful()) { + // TODO: Add a CustomErrorContext when this plugin is converted to EVF. + throw UserException + .dataReadError() + .message("Error executing druid query. HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .build(logger); + } - String data = EntityUtils.toString(response.getEntity()); - ArrayNode responses = mapper.readValue(data, ArrayNode.class); - return parseResponse(responses); + InputStream responseStream = response.body().byteStream(); + ArrayNode responses = mapper.readValue(responseStream, ArrayNode.class); + return parseResponse(responses); + } } private DruidScanResponse parseResponse(ArrayNode responses) { diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java index d88a41f5b4..fa8a4cf5e1 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java @@ -17,11 +17,25 @@ */ package org.apache.drill.exec.store.druid.rest; -import org.apache.http.HttpResponse; +import okhttp3.Response; import java.io.IOException; public interface RestClient { - HttpResponse get(String url) throws IOException; - HttpResponse post(String url, String body) throws IOException; + /** + * Executes an HTTP GET. + * @param url request URL + * @return a Response object that the caller is responsible for closing. + * @throws IOException + */ + Response get(String url) throws IOException; + + /** + * Executes an HTTP POST. + * @param url request URL. + * @param body request body. + * @return a Response object that the caller is responsible for closing. + * @throws IOException + */ + Response post(String url, String body) throws IOException; } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java index 3dd47ee261..a5cb6e73f1 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java @@ -17,36 +17,36 @@ */ package org.apache.drill.exec.store.druid.rest; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.DefaultHttpClient; - -import javax.ws.rs.core.HttpHeaders; -import java.io.IOException; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; -import static javax.ws.rs.core.MediaType.APPLICATION_JSON; -import static org.apache.http.protocol.HTTP.CONTENT_TYPE; +import java.nio.charset.StandardCharsets; +import java.io.IOException; public class RestClientWrapper implements RestClient { - private static final HttpClient httpClient = new DefaultHttpClient(); - private static final String DEFAULT_ENCODING = "UTF-8"; + // OkHttp client is designed to be shared across threads. + private final OkHttpClient httpClient = new OkHttpClient(); - public HttpResponse get(String url) throws IOException { - HttpGet httpget = new HttpGet(url); - httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); - return httpClient.execute(httpget); + public Response get(String url) throws IOException { + Request get = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .build(); + + return httpClient.newCall(get).execute(); } - public HttpResponse post(String url, String body) throws IOException { - HttpPost httppost = new HttpPost(url); - httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON); - HttpEntity entity = new ByteArrayEntity(body.getBytes(DEFAULT_ENCODING)); - httppost.setEntity(entity); + public Response post(String url, String body) throws IOException { + RequestBody postBody = RequestBody.create(body.getBytes(StandardCharsets.UTF_8)); + + Request post = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .post(postBody) + .build(); - return httpClient.execute(httppost); + return httpClient.newCall(post).execute(); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java index ef47e26c0e..d52a8fc28e 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java @@ -35,7 +35,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants { startCluster(ClusterFixture.builder(dirTestWatcher)); pluginRegistry = cluster.drillbit().getContext().getStorage(); - DruidTestSuit.initDruid(); + DruidTestSuite.initDruid(); initDruidStoragePlugin(); } @@ -43,7 +43,7 @@ public class DruidTestBase extends ClusterTest implements DruidTestConstants { pluginRegistry .put( DruidStoragePluginConfig.NAME, - DruidTestSuit.getDruidStoragePluginConfig()); + DruidTestSuite.getDruidStoragePluginConfig()); } @AfterClass diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java similarity index 98% rename from contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java rename to contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java index 5e1c149556..92d3c3486a 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java @@ -43,8 +43,8 @@ import java.io.File; DruidScanSpecBuilderTest.class }) @Category({SlowTest.class, DruidStorageTest.class}) -public class DruidTestSuit { - private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class); +public class DruidTestSuite { + private static final Logger logger = LoggerFactory.getLogger(DruidTestSuite.class); private static final ObjectMapper mapper = new ObjectMapper(); diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java index f4abcd3ca5..3b96db7cdb 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java @@ -23,38 +23,28 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.shaded.guava.com.google.common.io.Resources; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.HttpHeaders; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import java.io.InputStream; import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.TimeUnit; -import static javax.ws.rs.core.MediaType.APPLICATION_JSON; -import static org.apache.http.protocol.HTTP.CONTENT_TYPE; - public class TestDataGenerator { private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class); - private static final HttpClient httpClient = new DefaultHttpClient(); + private static final OkHttpClient httpClient = new OkHttpClient(); private static final ObjectMapper mapper = new ObjectMapper(); - private static final String DEFAULT_ENCODING = "UTF-8"; - private static final String RESPONSE_SUCCESS = "SUCCESS"; public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception { @@ -72,11 +62,13 @@ public class TestDataGenerator { private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) { try { String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health"; - HttpGet httpGet = new HttpGet(healthCheckUrl); - HttpResponse response = httpClient.execute(httpGet); - StatusLine statusLine = response.getStatusLine(); - String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true"); + Request get = new Request.Builder() + .url(healthCheckUrl) + .build(); + + try (Response resp = httpClient.newCall(get).execute()) { + return resp.isSuccessful() && resp.body().string().equalsIgnoreCase("true"); + } } catch (Exception ex) { logger.error("Error getting druid status", ex); return false; @@ -90,18 +82,24 @@ public class TestDataGenerator { private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException { try { String url = taskUrl(druidStoragePluginConfig); - byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI())); - - HttpPost httpPost = new HttpPost(url); - httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON); - HttpEntity entity = new ByteArrayEntity(taskConfig); - httpPost.setEntity(entity); - - HttpResponse response = httpClient.execute(httpPost); - String data = EntityUtils.toString(response.getEntity()); - TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class); - logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId()); - return taskStartResponse.getTaskId(); + RequestBody postBody = RequestBody.create( + Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI())) + ); + Request post = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .post(postBody) + .build(); + + try (Response resp = httpClient.newCall(post).execute()) { + String respBodyStr = resp.body().string(); + TaskStartResponse taskStartResponse = mapper.readValue( + respBodyStr, + TaskStartResponse.class + ); + logger.debug("Started Indexing Task - {}", taskStartResponse.getTaskId()); + return taskStartResponse.getTaskId(); + } } catch (Exception ex) { logger.error("Error starting Indexing Task"); throw ex; @@ -114,14 +112,17 @@ public class TestDataGenerator { Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes)); String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status"; - HttpGet httpget = new HttpGet(url); - httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); - - HttpResponse response = httpClient.execute(httpget); - String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class); - if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) { - throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status)); + Request get = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .build(); + + try (Response resp = httpClient.newCall(get).execute()) { + InputStream jsonStream = resp.body().byteStream(); + TaskStatusResponse taskStatusResponse = mapper.readValue(jsonStream, TaskStatusResponse.class); + if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) { + throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status)); + } } logger.debug("Task {} finished successfully", taskId); diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java index de560d9557..0fe579c7ef 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java @@ -18,10 +18,6 @@ package org.apache.drill.exec.store.druid.rest; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.http.HttpStatus; -import org.apache.http.HttpResponse; -import org.apache.http.StatusLine; -import org.apache.http.HttpEntity; import org.apache.http.Header; import org.apache.http.HttpHeaders; import org.apache.http.message.BasicHeader; @@ -29,6 +25,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import okhttp3.Response; +import okhttp3.ResponseBody; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -45,13 +44,10 @@ public class DruidQueryClientTest { private RestClient restClient; @Mock - private HttpResponse httpResponse; - - @Mock - private StatusLine statusLine; + private Response httpResponse; @Mock - private HttpEntity httpEntity; + private ResponseBody httpResponseBody; private DruidQueryClient druidQueryClient; private static final String BROKER_URI = "some broker uri"; @@ -62,14 +58,11 @@ public class DruidQueryClientTest { @Before public void setup() throws IOException { restClient = mock(RestClient.class); - httpResponse = mock(HttpResponse.class); - statusLine = mock(StatusLine.class); - httpEntity = mock(HttpEntity.class); - - when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); - when(httpEntity.getContentEncoding()).thenReturn(ENCODING_HEADER); - when(httpResponse.getStatusLine()).thenReturn(statusLine); - when(httpResponse.getEntity()).thenReturn(httpEntity); + httpResponse = mock(Response.class); + httpResponseBody = mock(ResponseBody.class); + + when(httpResponse.isSuccessful()).thenReturn(true); + when(httpResponse.body()).thenReturn(httpResponseBody); when(restClient.post(BROKER_URI + "/druid/v2", QUERY)) .thenReturn(httpResponse); @@ -79,7 +72,7 @@ public class DruidQueryClientTest { @Test(expected=Exception.class) public void executeQueryCalledDruidReturnsNon200ShouldThrowError() throws Exception { - when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + when(httpResponse.isSuccessful()).thenReturn(false); druidQueryClient.executeQuery(QUERY); } @@ -87,8 +80,8 @@ public class DruidQueryClientTest { public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList() throws Exception { InputStream inputStream = - new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8.name())); - when(httpEntity.getContent()).thenReturn(inputStream); + new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8)); + when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); assertThat(response.getEvents()).isEmpty(); @@ -99,8 +92,8 @@ public class DruidQueryClientTest { throws Exception { String result = "[{\"segmentId\":\"wikipedia_2016-06-27T14:00:00.000Z_2016-06-27T15:00:00.000Z_2021-12-11T11:12:16.106Z\",\"columns\":[\"__time\",\"channel\",\"cityName\",\"comment\",\"countryIsoCode\",\"countryName\",\"diffUrl\",\"flags\",\"isAnonymous\",\"isMinor\",\"isNew\",\"isRobot\",\"isUnpatrolled\",\"metroCode\",\"namespace\",\"page\",\"regionIsoCode\",\"regionName\",\"user\",\"sum_deleted\",\"sum_deltaBucket\",\"sum_added\",\"sum_commentLength\",\"count\",\"sum_delta\"],\"ev [...] InputStream inputStream = - new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8.name())); - when(httpEntity.getContent()).thenReturn(inputStream); + new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)); + when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); assertThat(response.getEvents()).isNotEmpty();
