This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9e3923372fff72dce07bd539ea031a2b49eaa951 Author: Michael Blow <[email protected]> AuthorDate: Mon May 4 14:35:19 2020 -0400 [NO ISSUE][HYR][HTTP] Utility method to enable interruptable http requests Change-Id: Id880af7e1e60195c53a043752d9339818c002a2c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6164 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../hyracks/api/util/HyracksThrowingConsumer.java | 26 ++++++++++++++++ .../apache/hyracks/http/server/utils/HttpUtil.java | 35 ++++++++++++++++++++++ .../org/apache/hyracks/util/ThrowingConsumer.java | 24 +++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java new file mode 100644 index 0000000..b677132 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingConsumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface HyracksThrowingConsumer<V> { + void process(V value) throws HyracksDataException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java index 2c9790e..77f6c6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java @@ -19,6 +19,7 @@ package org.apache.hyracks.http.server.utils; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -27,14 +28,22 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalDouble; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.BaseRequest; import org.apache.hyracks.http.server.FormUrlEncodedRequest; +import org.apache.hyracks.util.ThrowingConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -184,6 +193,32 @@ public class HttpUtil { return i < 0 ? uri : uri.substring(0, i); } + public static void handleStreamInterruptibly(CloseableHttpResponse response, + ThrowingConsumer<InputStreamReader> streamProcessor, ExecutorService executor, + Supplier<String> taskDescription) throws IOException, InterruptedException, ExecutionException { + // we have to consume the stream in a separate thread, as it not stop on interrupt; we need to + // instead close the connection to achieve the interrupt + Future<Void> readFuture = executor.submit(() -> { + InputStreamReader reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8); + streamProcessor.process(reader); + return null; + }); + try { + readFuture.get(); + } catch (InterruptedException ex) { // NOSONAR -- interrupt or rethrow + response.close(); + try { + readFuture.get(1, TimeUnit.SECONDS); + } catch (TimeoutException te) { + LOGGER.warn("{} did not exit on stream close due to interrupt after 1s", taskDescription); + readFuture.cancel(true); + } catch (ExecutionException ee) { + LOGGER.debug("ignoring exception awaiting aborted {} shutdown", taskDescription, ee); + } + throw ex; + } + } + public static class ContentType { public static final String APPLICATION_ADM = "application/x-adm"; public static final String APPLICATION_JSON = "application/json"; diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java new file mode 100644 index 0000000..bfc5fdd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +@FunctionalInterface +public interface ThrowingConsumer<V> { + void process(V value) throws Exception; +}
