This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5fcd0d1 [Performance] Optimize CompletableFuture timeout handling (#10065) 5fcd0d1 is described below commit 5fcd0d18f8b33474fd8fb5faa3e2330fd88c554a Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Mon Mar 29 18:23:47 2021 +0300 [Performance] Optimize CompletableFuture timeout handling (#10065) --- .../pulsar/broker/admin/impl/BrokersBase.java | 17 ++-- .../pulsar/broker/service/BrokerService.java | 38 ++++----- .../pulsar/compaction/TwoPhaseCompactor.java | 18 ++-- .../admin/internal/http/AsyncHttpConnector.java | 19 ++--- .../org/apache/pulsar/common/util/FutureUtil.java | 73 ++++++++++++++++ .../apache/pulsar/common/util/FutureUtilTest.java | 98 ++++++++++++++++++++++ 6 files changed, 206 insertions(+), 57 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 5a41037..abac85b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -23,15 +23,13 @@ import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -60,6 +58,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +67,7 @@ import org.slf4j.LoggerFactory; */ public class BrokersBase extends PulsarWebResource { private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); + private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10); @GET @Path("/{cluster}") @@ -346,13 +346,10 @@ public class BrokersBase extends PulsarWebResource { healthcheckReadLoop(readerFuture, completePromise, messageStr); // timeout read loop after 10 seconds - ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> { - completePromise.completeExceptionally(new TimeoutException("Timed out reading")); - }, 10, TimeUnit.SECONDS); - // don't leave timeout dangling - completePromise.whenComplete((ignore2, exception2) -> { - timeout.cancel(false); - }); + FutureUtil.addTimeoutHandling(completePromise, + HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(), + () -> FutureUtil.createTimeoutException("Timed out reading", getClass(), + "healthcheck(...)")); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2af76e9..1de8d9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -179,6 +180,13 @@ import org.slf4j.LoggerFactory; @Setter(AccessLevel.PROTECTED) public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> { private static final Logger log = LoggerFactory.getLogger(BrokerService.class); + private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60); + private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION = + FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class, + "futureWithDeadline(...)"); + private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION = + FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class, + "futureWithDeadline(...)"); private final PulsarService pulsar; private final ManagedLedgerFactory managedLedgerFactory; @@ -847,7 +855,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topic, e); - return failedFuture(e); + return FutureUtil.failedFuture(e); } catch (RuntimeException e) { Throwable cause = e.getCause(); if (cause instanceof ServiceUnitNotReadyException) { @@ -856,7 +864,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e); } - return failedFuture(cause); + return FutureUtil.failedFuture(cause); } } @@ -964,25 +972,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return topicFuture; } - private static <T> CompletableFuture<T> failedFuture(Throwable t) { - CompletableFuture<T> future = new CompletableFuture<>(); - future.completeExceptionally(t); - return future; - } - - private <T> CompletableFuture<T> futureWithDeadline(Long delay, TimeUnit unit, Exception exp) { - CompletableFuture<T> future = new CompletableFuture<T>(); - executor().schedule(() -> { - if (!future.isDone()) { - future.completeExceptionally(exp); - } - }, delay, unit); - return future; - } - private <T> CompletableFuture<T> futureWithDeadline() { - return futureWithDeadline(60000L, TimeUnit.MILLISECONDS, - new TimeoutException("Future didn't finish within deadline")); + return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(), + () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION); } public PulsarClient getReplicationClient(String cluster) { @@ -1093,9 +1085,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies */ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws RuntimeException { - final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline( - pulsar.getConfiguration().getTopicLoadTimeoutSeconds(), - TimeUnit.SECONDS, new TimeoutException("Failed to load topic within timeout")); + final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout( + Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), + () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index bee9878..800182e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -21,15 +21,13 @@ package org.apache.pulsar.compaction; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -62,6 +60,7 @@ public class TwoPhaseCompactor extends Compactor { private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class); private static final int MAX_OUTSTANDING = 500; private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; + public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10); public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, @@ -116,7 +115,9 @@ public class TwoPhaseCompactor extends Compactor { return; } CompletableFuture<RawMessage> future = reader.readNextAsync(); - scheduleTimeout(future); + FutureUtil.addTimeoutHandling(future, + PHASE_ONE_LOOP_READ_TIMEOUT, scheduler, + () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); future.thenAcceptAsync(m -> { try { @@ -172,15 +173,6 @@ public class TwoPhaseCompactor extends Compactor { }); } - private void scheduleTimeout(CompletableFuture<RawMessage> future) { - Future<?> timeout = scheduler.schedule(() -> { - future.completeExceptionally(new TimeoutException("Timeout")); - }, 10, TimeUnit.SECONDS); - future.whenComplete((res, exception) -> { - timeout.cancel(true); - }); - } - private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk) { Map<String, byte[]> metadata = diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index bdd5920..bf8492c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -26,13 +26,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.KeyStoreParams; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext; import org.asynchttpclient.AsyncHttpClient; @@ -71,10 +72,11 @@ import org.glassfish.jersey.client.spi.Connector; */ @Slf4j public class AsyncHttpConnector implements Connector { - + private static final TimeoutException READ_TIMEOUT_EXCEPTION = + FutureUtil.createTimeoutException("Read timeout", AsyncHttpConnector.class, "retryOrTimeout(...)"); @Getter private final AsyncHttpClient httpClient; - private final int readTimeout; + private final Duration readTimeout; private final int maxRetries; private final PulsarServiceNameResolver serviceNameResolver; private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1, @@ -156,7 +158,7 @@ public class AsyncHttpConnector implements Connector { } } httpClient = new DefaultAsyncHttpClient(confBuilder.build()); - this.readTimeout = readTimeoutMs; + this.readTimeout = Duration.ofMillis(readTimeoutMs); this.maxRetries = httpClient.getConfig().getMaxRequestRetry(); } @@ -216,7 +218,8 @@ public class AsyncHttpConnector implements Connector { private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) { final CompletableFuture<Response> resultFuture = new CompletableFuture<>(); retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries); - CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS); + CompletableFuture<Response> timeoutAfter = FutureUtil.createFutureWithTimeout(readTimeout, delayer, + () -> READ_TIMEOUT_EXCEPTION); return resultFuture.applyToEither(timeoutAfter, Function.identity()); } @@ -297,12 +300,6 @@ public class AsyncHttpConnector implements Connector { return builder.execute().toCompletableFuture(); } - public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) { - CompletableFuture<T> result = new CompletableFuture<>(); - delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit); - return result; - } - @Override public String getName() { return "Pulsar-Admin"; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index b86ee10..53b6deb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -18,9 +18,15 @@ */ package org.apache.pulsar.common.util; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** * This class is aimed at simplifying work with {@code CompletableFuture}. @@ -50,4 +56,71 @@ public class FutureUtil { return t; } } + + /** + * Creates a new {@link CompletableFuture} instance with timeout handling. + * + * @param timeout the duration of the timeout + * @param executor the executor to use for scheduling the timeout + * @param exceptionSupplier the supplier for creating the exception + * @param <T> type parameter for the future + * @return the new {@link CompletableFuture} instance + */ + public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout, + ScheduledExecutorService executor, + Supplier<Throwable> exceptionSupplier) { + return addTimeoutHandling(new CompletableFuture<>(), timeout, executor, exceptionSupplier); + } + + /** + * Adds timeout handling to an existing {@link CompletableFuture}. + * + * @param future the target future + * @param timeout the duration of the timeout + * @param executor the executor to use for scheduling the timeout + * @param exceptionSupplier the supplier for creating the exception + * @param <T> type parameter for the future + * @return returns the original target future + */ + public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout, + ScheduledExecutorService executor, + Supplier<Throwable> exceptionSupplier) { + ScheduledFuture<?> scheduledFuture = executor.schedule(() -> { + if (!future.isDone()) { + future.completeExceptionally(exceptionSupplier.get()); + } + }, timeout.toMillis(), TimeUnit.MILLISECONDS); + future.whenComplete((res, exception) -> scheduledFuture.cancel(false)); + return future; + } + + /** + * Creates a low-overhead timeout exception which is performance optimized to minimize allocations + * and cpu consumption. It sets the stacktrace of the exception to the given source class and + * source method name. The instances of this class can be cached or stored as constants and reused + * multiple times. + * + * @param message exception message + * @param sourceClass source class for manually filled in stacktrace + * @param sourceMethod source method name for manually filled in stacktrace + * @return new TimeoutException instance + */ + public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) { + return new LowOverheadTimeoutException(message, sourceClass, sourceMethod); + } + + private static class LowOverheadTimeoutException extends TimeoutException { + private static final long serialVersionUID = 1L; + + LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) { + super(message); + setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod, + null, -1)}); + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java new file mode 100644 index 0000000..8378aa5 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.common.util; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import org.testng.annotations.Test; + +public class FutureUtilTest { + + @Test + public void testCreateTimeoutException() { + TimeoutException timeoutException = FutureUtil.createTimeoutException("hello world", getClass(), "test(...)"); + assertNotNull(timeoutException); + assertEquals(timeoutException.getMessage(), "hello world"); + StringWriter stringWriter = new StringWriter(); + timeoutException.printStackTrace(new PrintWriter(stringWriter, true)); + assertEquals(stringWriter.toString(), + "org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: " + + "hello world\n" + + "\tat org.apache.pulsar.common.util.FutureUtilTest.test(...)(Unknown Source)\n"); + } + + @Test + public void testTimeoutHandling() { + CompletableFuture<Void> future = new CompletableFuture<>(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Exception e = new Exception(); + try { + FutureUtil.addTimeoutHandling(future, Duration.ofMillis(1), executor, () -> e); + future.get(); + fail("Should have failed."); + } catch (InterruptedException interruptedException) { + fail("Shouldn't occur"); + } catch (ExecutionException executionException) { + assertEquals(executionException.getCause(), e); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testTimeoutHandlingNoTimeout() throws ExecutionException, InterruptedException { + CompletableFuture<Void> future = new CompletableFuture<>(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + try { + FutureUtil.addTimeoutHandling(future, Duration.ofMillis(100), executor, () -> new Exception()); + future.complete(null); + future.get(); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testCreatingFutureWithTimeoutHandling() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Exception e = new Exception(); + try { + CompletableFuture<Void> future = FutureUtil.createFutureWithTimeout(Duration.ofMillis(1), executor, + () -> e); + future.get(); + fail("Should have failed."); + } catch (InterruptedException interruptedException) { + fail("Shouldn't occur"); + } catch (ExecutionException executionException) { + assertEquals(executionException.getCause(), e); + } finally { + executor.shutdownNow(); + } + } +} \ No newline at end of file