This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.20 in repository https://gitbox.apache.org/repos/asf/drill.git
commit 1cfedd677a4aeeea8e02e9787f4d994739cf62dc Author: James Turton <[email protected]> AuthorDate: Fri Sep 9 05:34:13 2022 +0800 DRILL-8295: Probable resource leak in the HTTP storage plugin (#2641) --- .../apache/drill/exec/store/http/HttpBatchReader.java | 1 + .../drill/exec/store/http/HttpXMLBatchReader.java | 2 +- .../apache/drill/exec/store/http/util/SimpleHttp.java | 16 ++++++++++------ .../apache/drill/exec/store/http/oauth/OAuthUtils.java | 18 ++++++++++-------- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index ad056d7fd0..8bbe855626 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -116,6 +116,7 @@ public class HttpBatchReader implements ManagedReader<SchemaNegotiator> { buildImplicitColumns(); } + // inStream is expected to be closed by the JsonLoader. InputStream inStream = http.getInputStream(); populateImplicitFieldMap(http); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java index 8e0aa684d4..c7a2857b74 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java @@ -143,7 +143,7 @@ public class HttpXMLBatchReader extends HttpBatchReader { @Override public void close() { - AutoCloseables.closeSilently(inStream); AutoCloseables.closeSilently(xmlReader); + AutoCloseables.closeSilently(inStream); } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index 03e4d5e5cc..e5940de184 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -28,6 +28,7 @@ import okhttp3.Request; import okhttp3.Response; import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.oauth.PersistentTokenTable; @@ -239,7 +240,8 @@ public class SimpleHttp { /** * Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong * the method throws a UserException. - * @return An Inputstream of the data from the URL call. + * @return An Inputstream of the data from the URL call. The caller is responsible for calling + * close() on the InputStream. */ public InputStream getInputStream() { @@ -268,15 +270,14 @@ public class SimpleHttp { // Build the request object Request request = requestBuilder.build(); + Response response = null; try { logger.debug("Executing request: {}", request); logger.debug("Headers: {}", request.headers()); // Execute the request - Response response = client - .newCall(request) - .execute(); + response = client.newCall(request).execute(); // Preserve the response responseMessage = response.message(); @@ -291,8 +292,9 @@ public class SimpleHttp { paginator.notifyPartialPage(); } - // If the request is unsuccessful, throw a UserException + // If the request is unsuccessful, clean up and throw a UserException if (!isSuccessful(responseCode)) { + AutoCloseables.closeSilently(response); throw UserException .dataReadError() .message("HTTP request failed") @@ -304,9 +306,11 @@ public class SimpleHttp { logger.debug("HTTP Request for {} successful.", url()); logger.debug("Response Headers: {} ", response.headers()); - // Return the InputStream of the response + // Return the InputStream of the response. Note that it is necessary and + // and sufficient that the caller invokes close() on the returned stream. return Objects.requireNonNull(response.body()).byteStream(); } catch (IOException e) { + // response can only be null at this location so we do not attempt to close it. throw UserException .dataReadError(e) .message("Failed to read the HTTP response body") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java index 573df78cba..8b80499bb6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java @@ -38,13 +38,13 @@ public class OAuthUtils { private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class); /** - * Craft a GET request to obtain an access token. + * Crafts a POST request to obtain an access token. * @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode * @param authorizationCode The authorization code from the OAuth2.0 enabled API * @param callbackURL The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place. * @return A Request Body to obtain an access token */ - public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { + public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { return new FormBody.Builder() .add("grant_type", "authorization_code") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -55,12 +55,12 @@ public class OAuthUtils { } /** - * Crafts a POST response for refreshing an access token when a refresh token is present. + * Crafts a POST request for refreshing an access token when a refresh token is present. * @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken * @param refreshToken The refresh token * @return A Request Body with the correct parameters for obtaining an access token */ - public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { + public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { return new FormBody.Builder() .add("grant_type", "refresh_token") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -90,7 +90,7 @@ public class OAuthUtils { .url(buildAccessTokenURL(credentialsProvider)) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL)) + .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL)) .build(); } @@ -110,7 +110,7 @@ public class OAuthUtils { .url(tokenURI) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken)) + .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken)) .build(); } @@ -127,9 +127,10 @@ public class OAuthUtils { String accessToken; String refreshToken; Map<String, String> tokens = new HashMap<>(); + Response response = null; try { - Response response = client.newCall(request).execute(); + response = client.newCall(request).execute(); String responseBody = response.body().string(); if (!response.isSuccessful()) { @@ -164,13 +165,14 @@ public class OAuthUtils { refreshToken = (String) parsedJson.get("refresh_token"); tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken); } - response.close(); return tokens; } catch (NullPointerException | IOException e) { throw UserException.connectionError() .message("Error refreshing access OAuth2 access token. " + e.getMessage()) .build(logger); + } finally { + response.close(); } } }
