This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 41be16d92ae609c096a0ba0ad18be08ba5a2768a Author: Divij Vaidya <[email protected]> AuthorDate: Wed Oct 2 09:50:59 2019 -0700 Upgrade Netty containing fix for proper close of FixedChannelPool --- .../gremlin/driver/ConnectionPoolImpl.java | 21 +- .../gremlin/driver/TinkerpopFixedChannelPool.java | 508 --------------------- 2 files changed, 11 insertions(+), 518 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java index 63cab7e..018faed 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java @@ -24,6 +24,7 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.pool.ChannelHealthChecker; import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; @@ -52,7 +53,7 @@ public class ConnectionPoolImpl implements ConnectionPool { * Netty's implementation of channel management with an upper bound. This connection * pool is responsible for attaching a channel with each request. */ - private TinkerpopFixedChannelPool channelPool; + private FixedChannelPool channelPool; /** * Channel initializer that is safe to be re-used across multiple channels. */ @@ -129,17 +130,17 @@ public class ConnectionPoolImpl implements ConnectionPool { logger.debug("Initialized {} successfully.", this); } - private TinkerpopFixedChannelPool createChannelPool(final Bootstrap b, + private FixedChannelPool createChannelPool(final Bootstrap b, final Settings.ConnectionPoolSettings connectionPoolSettings, final ChannelPoolHandler handler) { - return new TinkerpopFixedChannelPool(b, - handler, - ChannelHealthChecker.ACTIVE, - TinkerpopFixedChannelPool.AcquireTimeoutAction.FAIL, // throw an exception on acquire timeout - connectionPoolSettings.maxWaitForConnection, - calculateMaxPoolSize(connectionPoolSettings), /*maxConnections*/ - 1, /*maxPendingAcquires*/ - true);/*releaseHealthCheck*/ + return new FixedChannelPool(b, + handler, + ChannelHealthChecker.ACTIVE, + FixedChannelPool.AcquireTimeoutAction.FAIL, // throw an exception on acquire timeout + connectionPoolSettings.maxWaitForConnection, + calculateMaxPoolSize(connectionPoolSettings), /*maxConnections*/ + 1, /*maxPendingAcquires*/ + true);/*releaseHealthCheck*/ } @Override diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java deleted file mode 100644 index ac77b38..0000000 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.pool.*; -import io.netty.util.concurrent.*; -import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.ThrowableUtil; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * TODO - * This class is a fork of Netty's {@link FixedChannelPool}. This should be removed once - * https://github.com/netty/netty/pull/9226 is resolved and we start consuming the release containing - * the fix. - */ -public class TinkerpopFixedChannelPool extends SimpleChannelPool { - private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("Too many outstanding acquire operations"), - TinkerpopFixedChannelPool.class, "acquire0(...)"); - private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace( - new TimeoutException("Acquire operation took longer then configured maximum time"), - TinkerpopFixedChannelPool.class, "<init>(...)"); - static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - TinkerpopFixedChannelPool.class, "release(...)"); - static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace( - new IllegalStateException("FixedChannelPool was closed"), - TinkerpopFixedChannelPool.class, "acquire0(...)"); - public enum AcquireTimeoutAction { - /** - * Create a new connection when the timeout is detected. - */ - NEW, - - /** - * Fail the {@link Future} of the acquire call with a {@link TimeoutException}. - */ - FAIL - } - - private final EventExecutor executor; - private final long acquireTimeoutNanos; - private final Runnable timeoutTask; - - // There is no need to worry about synchronization as everything that modified the queue or counts is done - // by the above EventExecutor. - private final Queue<TinkerpopFixedChannelPool.AcquireTask> pendingAcquireQueue = new ArrayDeque<TinkerpopFixedChannelPool.AcquireTask>(); - private final int maxConnections; - private final int maxPendingAcquires; - private final AtomicInteger acquiredChannelCount = new AtomicInteger(); - private int pendingAcquireCount; - private boolean closed; - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to acquire - * a {@link Channel} will be delayed until a connection is returned to the pool again. - */ - public TinkerpopFixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections) { - this(bootstrap, handler, maxConnections, Integer.MAX_VALUE); - } - - /** - * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public TinkerpopFixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - */ - public TinkerpopFixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - */ - public TinkerpopFixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) { - this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, - releaseHealthCheck, true); - } - - /** - * Creates a new instance. - * - * @param bootstrap the {@link Bootstrap} that is used for connections - * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions - * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is - * still healthy when obtain from the {@link ChannelPool} - * @param action the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used. - * In this case {@param acquireTimeoutMillis} must be {@code -1}. - * @param acquireTimeoutMillis the time (in milliseconds) after which an pending acquire must complete or - * the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place. - * @param maxConnections the number of maximal active connections, once this is reached new tries to - * acquire a {@link Channel} will be delayed until a connection is returned to the - * pool again. - * @param maxPendingAcquires the maximum number of pending acquires. Once this is exceed acquire tries will - * be failed. - * @param releaseHealthCheck will check channel health before offering back if this parameter set to - * {@code true}. - * @param lastRecentUsed {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO. - */ - public TinkerpopFixedChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, - ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action, - final long acquireTimeoutMillis, - int maxConnections, int maxPendingAcquires, - boolean releaseHealthCheck, boolean lastRecentUsed) { - super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); - if (maxConnections < 1) { - throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); - } - if (maxPendingAcquires < 1) { - throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)"); - } - if (action == null && acquireTimeoutMillis == -1) { - timeoutTask = null; - acquireTimeoutNanos = -1; - } else if (action == null && acquireTimeoutMillis != -1) { - throw new NullPointerException("action"); - } else if (action != null && acquireTimeoutMillis < 0) { - throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)"); - } else { - acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); - switch (action) { - case FAIL: - timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() { - @Override - public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) { - // Fail the promise as we timed out. - task.promise.setFailure(TIMEOUT_EXCEPTION); - } - }; - break; - case NEW: - timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() { - @Override - public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) { - // Increment the acquire count and delegate to super to actually acquire a Channel which will - // create a new connection. - task.acquired(); - - TinkerpopFixedChannelPool.super.acquire(task.promise); - } - }; - break; - default: - throw new Error(); - } - } - executor = bootstrap.config().group().next(); - this.maxConnections = maxConnections; - this.maxPendingAcquires = maxPendingAcquires; - } - - /** Returns the number of acquired channels that this pool thinks it has. */ - public int acquiredChannelCount() { - return acquiredChannelCount.get(); - } - - @Override - public Future<Channel> acquire(final Promise<Channel> promise) { - try { - if (executor.inEventLoop()) { - acquire0(promise); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - acquire0(promise); - } - }); - } - } catch (Throwable cause) { - promise.setFailure(cause); - } - return promise; - } - - private void acquire0(final Promise<Channel> promise) { - assert executor.inEventLoop(); - - if (closed) { - promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - if (acquiredChannelCount.get() < maxConnections) { - assert acquiredChannelCount.get() >= 0; - - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop - Promise<Channel> p = executor.newPromise(); - TinkerpopFixedChannelPool.AcquireListener l = new TinkerpopFixedChannelPool.AcquireListener(promise); - l.acquired(); - p.addListener(l); - super.acquire(p); - } else { - if (pendingAcquireCount >= maxPendingAcquires) { - promise.setFailure(FULL_EXCEPTION); - } else { - TinkerpopFixedChannelPool.AcquireTask task = new TinkerpopFixedChannelPool.AcquireTask(promise); - if (pendingAcquireQueue.offer(task)) { - ++pendingAcquireCount; - - if (timeoutTask != null) { - task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); - } - } else { - promise.setFailure(FULL_EXCEPTION); - } - } - - assert pendingAcquireCount > 0; - } - } - - @Override - public Future<Void> release(final Channel channel, final Promise<Void> promise) { - ObjectUtil.checkNotNull(promise, "promise"); - final Promise<Void> p = executor.newPromise(); - super.release(channel, p.addListener(new FutureListener<Void>() { - - @Override - public void operationComplete(Future<Void> future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - // Since the pool is closed, we have no choice but to close the channel - channel.close(); - promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - decrementAndRunTaskQueue(); - promise.setSuccess(null); - } else { - Throwable cause = future.cause(); - // Check if the exception was not because of we passed the Channel to the wrong pool. - if (!(cause instanceof IllegalArgumentException)) { - decrementAndRunTaskQueue(); - } - promise.setFailure(future.cause()); - } - } - })); - return promise; - } - - private void decrementAndRunTaskQueue() { - // We should never have a negative value. - int currentCount = acquiredChannelCount.decrementAndGet(); - assert currentCount >= 0; - - // Run the pending acquire tasks before notify the original promise so if the user would - // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= - // maxPendingAcquires we may be able to run some pending tasks first and so allow to add - // more. - runTaskQueue(); - } - - private void runTaskQueue() { - while (acquiredChannelCount.get() < maxConnections) { - TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - - // Cancel the timeout if one was scheduled - ScheduledFuture<?> timeoutFuture = task.timeoutFuture; - if (timeoutFuture != null) { - timeoutFuture.cancel(false); - } - - --pendingAcquireCount; - task.acquired(); - - super.acquire(task.promise); - } - - // We should never have a negative value. - assert pendingAcquireCount >= 0; - assert acquiredChannelCount.get() >= 0; - } - - // AcquireTask extends AcquireListener to reduce object creations and so GC pressure - private final class AcquireTask extends TinkerpopFixedChannelPool.AcquireListener { - final Promise<Channel> promise; - final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos; - ScheduledFuture<?> timeoutFuture; - - public AcquireTask(Promise<Channel> promise) { - super(promise); - // We need to create a new promise as we need to ensure the AcquireListener runs in the correct - // EventLoop. - this.promise = executor.<Channel>newPromise().addListener(this); - } - } - - private abstract class TimeoutTask implements Runnable { - @Override - public final void run() { - assert executor.inEventLoop(); - long nanoTime = System.nanoTime(); - for (;;) { - TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.peek(); - // Compare nanoTime as descripted in the javadocs of System.nanoTime() - // - // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() - // See https://github.com/netty/netty/issues/3705 - if (task == null || nanoTime - task.expireNanoTime < 0) { - break; - } - pendingAcquireQueue.remove(); - - --pendingAcquireCount; - onTimeout(task); - } - } - - public abstract void onTimeout(TinkerpopFixedChannelPool.AcquireTask task); - } - - private class AcquireListener implements FutureListener<Channel> { - private final Promise<Channel> originalPromise; - protected boolean acquired; - - AcquireListener(Promise<Channel> originalPromise) { - this.originalPromise = originalPromise; - } - - @Override - public void operationComplete(Future<Channel> future) throws Exception { - assert executor.inEventLoop(); - - if (closed) { - if (future.isSuccess()) { - // Since the pool is closed, we have no choice but to close the channel - future.getNow().close(); - } - originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); - return; - } - - if (future.isSuccess()) { - originalPromise.setSuccess(future.getNow()); - } else { - if (acquired) { - decrementAndRunTaskQueue(); - } else { - runTaskQueue(); - } - - originalPromise.setFailure(future.cause()); - } - } - - public void acquired() { - if (acquired) { - return; - } - acquiredChannelCount.incrementAndGet(); - acquired = true; - } - } - - /** - * Closes the pool in an async manner. - * - * @return Future which represents completion of the close task - */ - public Future<Void> closeAsync() { - if (executor.inEventLoop()) { - return close0(); - } else { - final Promise<Void> closeComplete = executor.newPromise(); - executor.execute(new Runnable() { - @Override - public void run() { - close0().addListener(new FutureListener<Void>() { - @Override - public void operationComplete(Future<Void> f) throws Exception { - if (f.isSuccess()) { - closeComplete.setSuccess(null); - } else { - closeComplete.setFailure(f.cause()); - } - } - }); - } - }); - return closeComplete; - } - } - - private Future<Void> close0() { - assert executor.inEventLoop(); - - if (!closed) { - closed = true; - for (;;) { - TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; - } - ScheduledFuture<?> f = task.timeoutFuture; - if (f != null) { - f.cancel(false); - } - task.promise.setFailure(new ClosedChannelException()); - } - acquiredChannelCount.set(0); - pendingAcquireCount = 0; - - // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need - // to ensure we will not block in a EventExecutor. - return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - TinkerpopFixedChannelPool.super.close(); - return null; - } - }); - } - - return GlobalEventExecutor.INSTANCE.newSucceededFuture(null); - } -}
