Author: remm
Date: Thu Nov 5 17:29:20 2015
New Revision: 1712826
URL: http://svn.apache.org/viewvc?rev=1712826&view=rev
Log:
- Add (another, sorry) classloader keyed static map to retrieve the
InstanceManager from anywhere, as a last resort.
- Pass along the application classloader during upgrade, using a token object.
- Don't use the InstanceManager for internal upgrade handlers (no naming, etc),
this would optimize websockets and HTTP/2 upgrade.
- Set a proper context classloader environment for upgrade handler init/destroy
and use the InstanceManager, except for internal handlers where they are
responsible for it.
- Support the InstanceManager for the websockets client endpoints.
Added:
tomcat/trunk/java/org/apache/coyote/UpgradeToken.java
tomcat/trunk/java/org/apache/coyote/http2/AsyncHttp2UpgradeHandler.jav
tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java (with
props)
Modified:
tomcat/trunk/java/org/apache/catalina/connector/Request.java
tomcat/trunk/java/org/apache/catalina/core/StandardContext.java
tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
tomcat/trunk/java/org/apache/coyote/Processor.java
tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
tomcat/trunk/res/checkstyle/org-import-control.xml
Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Thu Nov 5
17:29:20 2015
@@ -80,6 +80,8 @@ import org.apache.catalina.core.AsyncCon
import org.apache.catalina.mapper.MappingData;
import org.apache.catalina.util.ParameterMap;
import org.apache.coyote.ActionCode;
+import org.apache.coyote.UpgradeToken;
+import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
@@ -1844,12 +1846,18 @@ public class Request implements HttpServ
T handler;
try {
- handler = (T)
getContext().getInstanceManager().newInstance(httpUpgradeHandlerClass);
+ // Do not go through the instance manager for internal Tomcat
classes since they don't need injection
+ if
(InternalHttpUpgradeHandler.class.isAssignableFrom(httpUpgradeHandlerClass)) {
+ handler = (T) httpUpgradeHandlerClass.newInstance();
+ } else {
+ handler = (T)
getContext().getInstanceManager().newInstance(httpUpgradeHandlerClass);
+ }
} catch (InstantiationException | IllegalAccessException |
InvocationTargetException | NamingException e) {
throw new ServletException(e);
}
+ UpgradeToken upgradeToken = new UpgradeToken(handler,
getContext().getLoader().getClassLoader());
- coyoteRequest.action(ActionCode.UPGRADE, handler);
+ coyoteRequest.action(ActionCode.UPGRADE, upgradeToken);
// Output required by RFC2616. Protocol specific headers should have
// already been set.
Modified: tomcat/trunk/java/org/apache/catalina/core/StandardContext.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/StandardContext.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/StandardContext.java (original)
+++ tomcat/trunk/java/org/apache/catalina/core/StandardContext.java Thu Nov 5
17:29:20 2015
@@ -113,6 +113,7 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.naming.ContextBindings;
import org.apache.tomcat.InstanceManager;
+import org.apache.tomcat.InstanceManagerBindings;
import org.apache.tomcat.JarScanner;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.IntrospectionUtils;
@@ -5147,9 +5148,10 @@ public class StandardContext extends Con
getIgnoreAnnotations() ? new
NamingResourcesImpl(): getNamingResources());
setInstanceManager(new DefaultInstanceManager(context,
injectionMap, this,
this.getClass().getClassLoader()));
- getServletContext().setAttribute(
- InstanceManager.class.getName(),
getInstanceManager());
}
+ getServletContext().setAttribute(
+ InstanceManager.class.getName(), getInstanceManager());
+ InstanceManagerBindings.bind(getLoader().getClassLoader(),
getInstanceManager());
}
// Create context attributes that will be required
@@ -5437,7 +5439,11 @@ public class StandardContext extends Con
}
Loader loader = getLoader();
if (loader instanceof Lifecycle) {
+ ClassLoader classLoader = loader.getClassLoader();
((Lifecycle) loader).stop();
+ if (classLoader != null) {
+ InstanceManagerBindings.unbind(classLoader);
+ }
}
// Stop resources
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Nov 5
17:29:20 2015
@@ -35,7 +35,9 @@ import javax.management.ObjectName;
import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;
+import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.juli.logging.Log;
+import org.apache.tomcat.InstanceManagerBindings;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.modeler.Registry;
@@ -738,14 +740,15 @@ public abstract class AbstractProtocol<S
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
- HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
+ UpgradeToken upgradeToken =
processor.getUpgradeToken();
+ HttpUpgradeHandler httpUpgradeHandler =
upgradeToken.getHttpUpgradeHandler();
// Retrieve leftover input
ByteBuffer leftoverInput =
processor.getLeftoverInput();
// Release the Http11 processor to be re-used
release(wrapper, processor, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
- wrapper, leftoverInput, httpUpgradeHandler);
+ wrapper, leftoverInput, upgradeToken);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
@@ -756,7 +759,19 @@ public abstract class AbstractProtocol<S
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal
with
// it.
- httpUpgradeHandler.init((WebConnection) processor);
+ if (httpUpgradeHandler instanceof
InternalHttpUpgradeHandler) {
+ httpUpgradeHandler.init((WebConnection) processor);
+ } else {
+ Thread thread = Thread.currentThread();
+ // Set context class loader environment for user
class call
+ ClassLoader originalClassLoader =
thread.getContextClassLoader();
+ try {
+
thread.setContextClassLoader(upgradeToken.getApplicationClassLoader());
+ httpUpgradeHandler.init((WebConnection)
processor);
+ } finally {
+
thread.setContextClassLoader(originalClassLoader);
+ }
+ }
}
} while ( state == SocketState.UPGRADING);
@@ -794,7 +809,23 @@ public abstract class AbstractProtocol<S
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
- processor.getHttpUpgradeHandler().destroy();
+ UpgradeToken upgradeToken =
processor.getUpgradeToken();
+ HttpUpgradeHandler httpUpgradeHandler =
upgradeToken.getHttpUpgradeHandler();
+ if (httpUpgradeHandler instanceof
InternalHttpUpgradeHandler) {
+ httpUpgradeHandler.destroy();
+ } else {
+ Thread thread = Thread.currentThread();
+ // Set context class loader environment for user
class call
+ ClassLoader originalClassLoader =
thread.getContextClassLoader();
+ try {
+
thread.setContextClassLoader(upgradeToken.getApplicationClassLoader());
+ httpUpgradeHandler.destroy();
+
InstanceManagerBindings.get(upgradeToken.getApplicationClassLoader())
+ .destroyInstance(httpUpgradeHandler);
+ } finally {
+
thread.setContextClassLoader(originalClassLoader);
+ }
+ }
} else {
release(wrapper, processor, false);
}
@@ -891,7 +922,7 @@ public abstract class AbstractProtocol<S
protected abstract Processor createUpgradeProcessor(
SocketWrapperBase<?> socket, ByteBuffer leftoverInput,
- HttpUpgradeHandler httpUpgradeHandler) throws IOException;
+ UpgradeToken upgradeToken) throws IOException;
protected void register(AbstractProcessor processor) {
Modified: tomcat/trunk/java/org/apache/coyote/Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Processor.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Processor.java Thu Nov 5 17:29:20 2015
@@ -19,8 +19,6 @@ package org.apache.coyote;
import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.servlet.http.HttpUpgradeHandler;
-
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus;
@@ -48,7 +46,7 @@ public interface Processor {
*/
SocketState process(SocketWrapperBase<?> socketWrapper, SocketStatus
status) throws IOException;
- HttpUpgradeHandler getHttpUpgradeHandler();
+ UpgradeToken getUpgradeToken();
boolean isUpgrade();
boolean isAsync();
Added: tomcat/trunk/java/org/apache/coyote/UpgradeToken.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/UpgradeToken.java?rev=1712826&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/UpgradeToken.java (added)
+++ tomcat/trunk/java/org/apache/coyote/UpgradeToken.java Thu Nov 5 17:29:20
2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.coyote;
+
+import javax.servlet.http.HttpUpgradeHandler;
+
+/**
+ * Token used during the upgrade process.
+ */
+public final class UpgradeToken {
+
+ private final ClassLoader applicationClassLoader;
+ private final HttpUpgradeHandler httpUpgradeHandler;
+
+ public UpgradeToken(HttpUpgradeHandler httpUpgradeHandler,
+ ClassLoader applicationClassLoader) {
+ this.applicationClassLoader = applicationClassLoader;
+ this.httpUpgradeHandler = httpUpgradeHandler;
+ }
+
+ public final ClassLoader getApplicationClassLoader() {
+ return applicationClassLoader;
+ }
+
+ public final HttpUpgradeHandler getHttpUpgradeHandler() {
+ return httpUpgradeHandler;
+ }
+
+}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java Thu Nov 5
17:29:20 2015
@@ -18,10 +18,9 @@ package org.apache.coyote.ajp;
import java.nio.ByteBuffer;
-import javax.servlet.http.HttpUpgradeHandler;
-
import org.apache.coyote.AbstractProtocol;
import org.apache.coyote.UpgradeProtocol;
+import org.apache.coyote.UpgradeToken;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.SSLHostConfig;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -188,7 +187,7 @@ public abstract class AbstractAjpProtoco
@Override
protected AjpProcessor createUpgradeProcessor(SocketWrapperBase<?>
socket,
- ByteBuffer leftoverInput, HttpUpgradeHandler
httpUpgradeHandler) {
+ ByteBuffer leftoverInput, UpgradeToken upgradeToken) {
// TODO should fail - throw IOE
return null;
}
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Nov 5
17:29:20 2015
@@ -28,7 +28,6 @@ import java.security.cert.X509Certificat
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpUpgradeHandler;
import org.apache.coyote.AbstractProcessor;
import org.apache.coyote.ActionCode;
@@ -37,6 +36,7 @@ import org.apache.coyote.ErrorState;
import org.apache.coyote.InputBuffer;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.RequestInfo;
+import org.apache.coyote.UpgradeToken;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
@@ -814,7 +814,7 @@ public class AjpProcessor extends Abstra
@Override
- public HttpUpgradeHandler getHttpUpgradeHandler() {
+ public UpgradeToken getUpgradeToken() {
// Should never reach this code but in case we do...
throw new IllegalStateException(
sm.getString("ajpprocessor.httpupgrade.notsupported"));
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Thu
Nov 5 17:29:20 2015
@@ -34,6 +34,7 @@ import javax.servlet.http.HttpUpgradeHan
import org.apache.coyote.AbstractProtocol;
import org.apache.coyote.Processor;
import org.apache.coyote.UpgradeProtocol;
+import org.apache.coyote.UpgradeToken;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal;
import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
@@ -662,13 +663,13 @@ public abstract class AbstractHttp11Prot
@Override
protected Processor createUpgradeProcessor(
SocketWrapperBase<?> socket, ByteBuffer leftoverInput,
- HttpUpgradeHandler httpUpgradeHandler)
+ UpgradeToken upgradeToken)
throws IOException {
+ HttpUpgradeHandler httpUpgradeHandler =
upgradeToken.getHttpUpgradeHandler();
if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
- return new UpgradeProcessorInternal(socket, leftoverInput,
- (InternalHttpUpgradeHandler) httpUpgradeHandler);
+ return new UpgradeProcessorInternal(socket, leftoverInput,
upgradeToken);
} else {
- return new UpgradeProcessorExternal(socket, leftoverInput,
httpUpgradeHandler);
+ return new UpgradeProcessorExternal(socket, leftoverInput,
upgradeToken);
}
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Thu Nov 5
17:29:20 2015
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpUpgradeHandler;
import org.apache.coyote.AbstractProcessor;
import org.apache.coyote.ActionCode;
@@ -36,6 +35,7 @@ import org.apache.coyote.ErrorState;
import org.apache.coyote.Request;
import org.apache.coyote.RequestInfo;
import org.apache.coyote.UpgradeProtocol;
+import org.apache.coyote.UpgradeToken;
import org.apache.coyote.http11.filters.BufferedInputFilter;
import org.apache.coyote.http11.filters.ChunkedInputFilter;
import org.apache.coyote.http11.filters.ChunkedOutputFilter;
@@ -203,7 +203,7 @@ public class Http11Processor extends Abs
* Instance of the new protocol to use after the HTTP connection has been
* upgraded.
*/
- protected HttpUpgradeHandler httpUpgradeHandler = null;
+ protected UpgradeToken upgradeToken = null;
/**
@@ -752,7 +752,7 @@ public class Http11Processor extends Abs
break;
}
case UPGRADE: {
- httpUpgradeHandler = (HttpUpgradeHandler) param;
+ upgradeToken = (UpgradeToken) param;
// Stop further HTTP output
outputBuffer.finished = true;
break;
@@ -942,7 +942,7 @@ public class Http11Processor extends Abs
boolean keptAlive = false;
while (!getErrorState().isError() && keepAlive && !isAsync() &&
- httpUpgradeHandler == null && !endpoint.isPaused()) {
+ upgradeToken == null && !endpoint.isPaused()) {
// Parsing the request header
try {
@@ -1020,7 +1020,9 @@ public class Http11Processor extends Abs
InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
getAdapter(), cloneRequest(request));
- action(ActionCode.UPGRADE, upgradeHandler);
+ UpgradeToken upgradeToken = new UpgradeToken(
+ upgradeHandler,
Http11Processor.class.getClassLoader());
+ action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
@@ -1693,14 +1695,14 @@ public class Http11Processor extends Abs
@Override
public boolean isUpgrade() {
- return httpUpgradeHandler != null;
+ return upgradeToken != null;
}
@Override
- public HttpUpgradeHandler getHttpUpgradeHandler() {
- return httpUpgradeHandler;
+ public UpgradeToken getUpgradeToken() {
+ return upgradeToken;
}
@@ -1797,7 +1799,7 @@ public class Http11Processor extends Abs
super.recycle();
inputBuffer.recycle();
outputBuffer.recycle();
- httpUpgradeHandler = null;
+ upgradeToken = null;
socketWrapper = null;
sendfileData = null;
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorBase.java
Thu Nov 5 17:29:20 2015
@@ -19,11 +19,11 @@ package org.apache.coyote.http11.upgrade
import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;
import org.apache.coyote.AbstractProcessorLight;
import org.apache.coyote.Request;
+import org.apache.coyote.UpgradeToken;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -31,11 +31,11 @@ public abstract class UpgradeProcessorBa
protected static final int INFINITE_TIMEOUT = -1;
- private final HttpUpgradeHandler httpUpgradeHandler;
+ private final UpgradeToken upgradeToken;
public UpgradeProcessorBase(SocketWrapperBase<?> wrapper, ByteBuffer
leftOverInput,
- HttpUpgradeHandler httpUpgradeHandler) {
- this.httpUpgradeHandler = httpUpgradeHandler;
+ UpgradeToken upgradeToken) {
+ this.upgradeToken = upgradeToken;
wrapper.unRead(leftOverInput);
}
@@ -49,8 +49,8 @@ public abstract class UpgradeProcessorBa
@Override
- public HttpUpgradeHandler getHttpUpgradeHandler() {
- return httpUpgradeHandler;
+ public UpgradeToken getUpgradeToken() {
+ return upgradeToken;
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java
Thu Nov 5 17:29:20 2015
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpUpgradeHandler;
+import org.apache.coyote.UpgradeToken;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -41,8 +41,8 @@ public class UpgradeProcessorExternal ex
public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer
leftOverInput,
- HttpUpgradeHandler httpUpgradeHandler) {
- super(wrapper, leftOverInput, httpUpgradeHandler);
+ UpgradeToken upgradeToken) {
+ super(wrapper, leftOverInput, upgradeToken);
this.upgradeServletInputStream = new UpgradeServletInputStream(this,
wrapper);
this.upgradeServletOutputStream = new UpgradeServletOutputStream(this,
wrapper);
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java
Thu Nov 5 17:29:20 2015
@@ -22,8 +22,10 @@ import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
+import org.apache.coyote.UpgradeToken;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.InstanceManagerBindings;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus;
@@ -36,9 +38,9 @@ public class UpgradeProcessorInternal ex
private final InternalHttpUpgradeHandler internalHttpUpgradeHandler;
public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, ByteBuffer
leftOverInput,
- InternalHttpUpgradeHandler internalHttpUpgradeHandler) {
- super(wrapper, leftOverInput, internalHttpUpgradeHandler);
- this.internalHttpUpgradeHandler = internalHttpUpgradeHandler;
+ UpgradeToken upgradeToken) {
+ super(wrapper, leftOverInput, upgradeToken);
+ this.internalHttpUpgradeHandler = (InternalHttpUpgradeHandler)
upgradeToken.getHttpUpgradeHandler();
/*
* Leave timeouts in the hands of the upgraded protocol.
*/
@@ -77,7 +79,12 @@ public class UpgradeProcessorInternal ex
@Override
public void close() throws Exception {
- internalHttpUpgradeHandler.destroy();
+ try {
+ internalHttpUpgradeHandler.destroy();
+ } finally {
+
InstanceManagerBindings.get(getUpgradeToken().getApplicationClassLoader())
+ .destroyInstance(internalHttpUpgradeHandler);
+ }
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
Thu Nov 5 17:29:20 2015
@@ -43,7 +43,6 @@ public class UpgradeServletInputStream e
// Start in blocking-mode
private volatile Boolean ready = Boolean.TRUE;
private volatile ReadListener listener = null;
- private volatile ClassLoader applicationLoader = null;
public UpgradeServletInputStream(UpgradeProcessorBase processor,
@@ -110,7 +109,6 @@ public class UpgradeServletInputStream e
}
this.listener = listener;
- this.applicationLoader =
Thread.currentThread().getContextClassLoader();
// Switching to non-blocking. Don't know if data is available.
ready = null;
}
@@ -211,7 +209,7 @@ public class UpgradeServletInputStream e
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
- thread.setContextClassLoader(applicationLoader);
+
thread.setContextClassLoader(processor.getUpgradeToken().getApplicationClassLoader());
if (!eof) {
listener.onDataAvailable();
}
@@ -234,7 +232,7 @@ public class UpgradeServletInputStream e
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
- thread.setContextClassLoader(applicationLoader);
+
thread.setContextClassLoader(processor.getUpgradeToken().getApplicationClassLoader());
listener.onError(t);
} catch (Throwable t2) {
ExceptionUtils.handleThrowable(t2);
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
Thu Nov 5 17:29:20 2015
@@ -59,7 +59,6 @@ public class UpgradeServletOutputStream
// Guarded by registeredLock
private boolean registered = false;
- private volatile ClassLoader applicationLoader = null;
public UpgradeServletOutputStream(UpgradeProcessorBase processor,
@@ -125,7 +124,6 @@ public class UpgradeServletOutputStream
}
this.listener = listener;
- this.applicationLoader =
Thread.currentThread().getContextClassLoader();
}
@@ -250,7 +248,7 @@ public class UpgradeServletOutputStream
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
- thread.setContextClassLoader(applicationLoader);
+
thread.setContextClassLoader(processor.getUpgradeToken().getApplicationClassLoader());
listener.onWritePossible();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
@@ -269,7 +267,7 @@ public class UpgradeServletOutputStream
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
- thread.setContextClassLoader(applicationLoader);
+
thread.setContextClassLoader(processor.getUpgradeToken().getApplicationClassLoader());
listener.onError(t);
} catch (Throwable t2) {
ExceptionUtils.handleThrowable(t2);
Added: tomcat/trunk/java/org/apache/coyote/http2/AsyncHttp2UpgradeHandler.jav
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AsyncHttp2UpgradeHandler.jav?rev=1712826&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AsyncHttp2UpgradeHandler.jav
(added)
+++ tomcat/trunk/java/org/apache/coyote/http2/AsyncHttp2UpgradeHandler.jav Thu
Nov 5 17:29:20 2015
@@ -0,0 +1,1276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.servlet.http.WebConnection;
+
+import org.apache.coyote.Adapter;
+import org.apache.coyote.ProtocolException;
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
+import org.apache.coyote.http2.HpackDecoder.HeaderEmitter;
+import org.apache.coyote.http2.HpackEncoder.State;
+import org.apache.coyote.http2.Http2Parser.Input;
+import org.apache.coyote.http2.Http2Parser.Output;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.codec.binary.Base64;
+import org.apache.tomcat.util.http.MimeHeaders;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * This represents an HTTP/2 connection from a client to Tomcat. It is designed
+ * on the basis that there will never be more than one thread performing I/O at
+ * a time.
+ * <br>
+ * For reading, this implementation is blocking within frames and non-blocking
+ * between frames.
+ * <br>
+ * Note:
+ * <ul>
+ * <li>Unless Tomcat is configured with an ECC certificate, FireFox (tested
with
+ * v37.0.2) needs to be configured with
+ * network.http.spdy.enforce-tls-profile=false in order for FireFox to be
+ * able to connect.</li>
+ * <li>You will need to nest an <UpgradeProtocol
+ * className="org.apache.coyote.http2.Http2Protocol" /> element inside
+ * a TLS enabled Connector element in server.xml to enable HTTP/2 support.
+ * </li>
+ * </ul>
+ *
+ * TODO: Review cookie parsing
+ */
+public class AsyncHttp2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeHandler,
+ Input, Output {
+
+ private static final Log log =
LogFactory.getLog(Http2UpgradeHandler.class);
+ private static final StringManager sm =
StringManager.getManager(Http2UpgradeHandler.class);
+
+ private static final AtomicInteger connectionIdGenerator = new
AtomicInteger(0);
+ private static final Integer STREAM_ID_ZERO = Integer.valueOf(0);
+
+ private static final int FLAG_END_OF_STREAM = 1;
+ private static final int FLAG_END_OF_HEADERS = 4;
+
+ private static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00,
0x00, 0x00, 0x00};
+ private static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01,
0x00, 0x00, 0x00, 0x00 };
+
+ private static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01,
0x00, 0x00, 0x00, 0x00 };
+
+ private static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00
};
+
+ private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings";
+ private static final byte[] HTTP2_UPGRADE_ACK = ("HTTP/1.1 101 Switching
Protocols\r\n" +
+ "Connection: Upgrade\r\nUpgrade:
h2c\r\n\r\n").getBytes(StandardCharsets.ISO_8859_1);
+
+ private static final HeaderSink HEADER_SINK = new HeaderSink();
+
+ private final String connectionId;
+
+ private final Adapter adapter;
+ private volatile SocketWrapperBase<?> socketWrapper;
+ private volatile SSLSupport sslSupport;
+
+ private volatile Http2Parser parser;
+
+ // Simple state machine (sequence of states)
+ private AtomicReference<ConnectionState> connectionState =
+ new AtomicReference<>(ConnectionState.NEW);
+ private volatile long pausedNanoTime = Long.MAX_VALUE;
+
+ private final ConnectionSettingsRemote remoteSettings = new
ConnectionSettingsRemote();
+ private final ConnectionSettingsLocal localSettings = new
ConnectionSettingsLocal();
+
+ private HpackDecoder hpackDecoder;
+ private HpackEncoder hpackEncoder;
+
+ // All timeouts in milliseconds
+ private long readTimeout = Http2Protocol.DEFAULT_READ_TIMEOUT;
+ private long keepAliveTimeout = Http2Protocol.DEFAULT_KEEP_ALIVE_TIMEOUT;
+ private long writeTimeout = Http2Protocol.DEFAULT_WRITE_TIMEOUT;
+
+ private final Map<Integer,Stream> streams = new HashMap<>();
+ private final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0);
+ private volatile int maxRemoteStreamId = 0;
+ // Start at -1 so the 'add 2' logic in closeIdleStreams() works
+ private volatile int maxActiveRemoteStreamId = -1;
+ private volatile int maxProcessedStreamId;
+ private final PingManager pingManager = new PingManager();
+ private volatile int newStreamsSinceLastPrune = 0;
+ // Tracking for when the connection is blocked (windowSize < 1)
+ private final Map<AbstractStream,int[]> backLogStreams = new
ConcurrentHashMap<>();
+ private long backLogSize = 0;
+
+
+ public AsyncHttp2UpgradeHandler(Adapter adapter, Request coyoteRequest) {
+ super (STREAM_ID_ZERO);
+ this.adapter = adapter;
+ this.connectionId =
Integer.toString(connectionIdGenerator.getAndIncrement());
+
+ // Initial HTTP request becomes stream 1.
+ if (coyoteRequest != null) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.upgrade",
connectionId));
+ }
+ Integer key = Integer.valueOf(1);
+ Stream stream = new Stream(key, this, coyoteRequest);
+ streams.put(key, stream);
+ maxRemoteStreamId = 1;
+ maxActiveRemoteStreamId = 1;
+ activeRemoteStreamCount.set(1);
+ maxProcessedStreamId = 1;
+ }
+ }
+
+
+ @Override
+ public void init(WebConnection webConnection) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.init", connectionId));
+ }
+
+ if (!connectionState.compareAndSet(ConnectionState.NEW,
ConnectionState.CONNECTED)) {
+ return;
+ }
+
+ parser = new Http2Parser(connectionId, this, this);
+
+ Stream stream = null;
+
+ socketWrapper.setReadTimeout(getReadTimeout());
+ socketWrapper.setWriteTimeout(getWriteTimeout());
+
+ if (webConnection != null) {
+ // HTTP/2 started via HTTP upgrade.
+ // The initial HTTP/1.1 request is available as Stream 1.
+
+ try {
+ // Acknowledge the upgrade request
+ //socketWrapper.write(true, HTTP2_UPGRADE_ACK, 0,
HTTP2_UPGRADE_ACK.length);
+ //socketWrapper.flush(true);
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
ByteBuffer.wrap(HTTP2_UPGRADE_ACK));
+
+ // Process the initial settings frame
+ stream = getStream(1, true);
+ String base64Settings =
stream.getCoyoteRequest().getHeader(HTTP2_SETTINGS_HEADER);
+ byte[] settings = Base64.decodeBase64(base64Settings);
+
+ // Settings are only valid on stream 0
+ FrameType.SETTINGS.check(0, settings.length);
+
+ for (int i = 0; i < settings.length % 6; i++) {
+ int id = ByteUtil.getTwoBytes(settings, i * 6);
+ long value = ByteUtil.getFourBytes(settings, (i * 6) + 2);
+ remoteSettings.set(Setting.valueOf(id), value);
+ }
+ } catch (Http2Exception /*| IOException*/ ioe) {
+ throw new ProtocolException(
+ sm.getString("upgradeHandler.upgrade.fail",
connectionId));
+ }
+ }
+
+ // Send the initial settings frame
+ /*try {
+ byte[] settings = localSettings.getSettingsFrameForPending();
+ socketWrapper.write(true, settings, 0, settings.length);
+ socketWrapper.flush(true);
+ } catch (IOException ioe) {
+ throw new
IllegalStateException(sm.getString("upgradeHandler.sendPrefaceFail"), ioe);
+ }*/
+ socketWrapper.write(true, getWriteTimeout(), TimeUnit.MILLISECONDS,
null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(localSettings.getSettingsFrameForPending()));
+
+ // Make sure the client has sent a valid connection preface before we
+ // send the response to the original request over HTTP/2.
+ try {
+ parser.readConnectionPreface();
+ } catch (Http2Exception e) {
+ throw new ProtocolException(
+ sm.getString("upgradeHandler.invalidPreface",
connectionId));
+ }
+
+ // Send a ping to get an idea of round trip time as early as possible
+ try {
+ pingManager.sendPing(true);
+ } catch (IOException ioe) {
+ throw new
ProtocolException(sm.getString("upgradeHandler.pingFailed"), ioe);
+ }
+
+ if (webConnection != null) {
+ // Process the initial request on a container thread
+ StreamProcessor streamProcessor = new StreamProcessor(stream,
adapter, socketWrapper);
+ streamProcessor.setSslSupport(sslSupport);
+ socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ }
+ }
+
+
+ @Override
+ public void setSocketWrapper(SocketWrapperBase<?> wrapper) {
+ this.socketWrapper = wrapper;
+ }
+
+
+ @Override
+ public void setSslSupport(SSLSupport sslSupport) {
+ this.sslSupport = sslSupport;
+ }
+
+
+ @Override
+ public SocketState upgradeDispatch(SocketStatus status) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.upgradeDispatch.entry",
connectionId, status));
+ }
+
+ // WebConnection is not used so passing null here is fine
+ // Might not be necessary. init() will handle that.
+ init(null);
+
+
+ SocketState result = SocketState.CLOSED;
+
+ try {
+ pingManager.sendPing(false);
+
+ checkPauseState();
+
+ switch(status) {
+ case OPEN_READ:
+ try {
+
+ while (true) {
+ try {
+ if (!parser.readFrame(false)) {
+ break;
+ }
+ } catch (StreamException se) {
+ // Stream errors are not fatal to the connection so
+ // continue reading frames
+ closeStream(se);
+ }
+ }
+ } catch (Http2Exception ce) {
+ // Really ConnectionError
+ if (log.isDebugEnabled()) {
+
log.debug(sm.getString("upgradeHandler.connectionError"), ce);
+ }
+ closeConnection(ce);
+ break;
+ }
+
+ result = SocketState.UPGRADED;
+ break;
+
+ case OPEN_WRITE:
+ processWrites();
+
+ result = SocketState.UPGRADED;
+ break;
+
+ case ASYNC_READ_ERROR:
+ case ASYNC_WRITE_ERROR:
+ case CLOSE_NOW:
+ // This should never happen and will be fatal for this
connection.
+ // Add the exception to trace how this point was reached.
+ log.error(sm.getString("upgradeHandler.unexpectedStatus",
status),
+ new IllegalStateException());
+ //$FALL-THROUGH$
+ case DISCONNECT:
+ case ERROR:
+ case TIMEOUT:
+ case STOP:
+ // For all of the above, including the unexpected values,
close the
+ // connection.
+ close();
+ break;
+ }
+ } catch (IOException ioe) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.ioerror",
connectionId), ioe);
+ }
+ close();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.upgradeDispatch.exit",
connectionId, result));
+ }
+ return result;
+ }
+
+
+ ConnectionSettingsRemote getRemoteSettings() {
+ return remoteSettings;
+ }
+
+
+ @Override
+ public void pause() {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.pause.entry",
connectionId));
+ }
+
+ if (connectionState.compareAndSet(ConnectionState.CONNECTED,
ConnectionState.PAUSING)) {
+ pausedNanoTime = System.nanoTime();
+
+ // Write a GOAWAY frame.
+ byte[] fixedPayload = new byte[8];
+ ByteUtil.set31Bits(fixedPayload, 0, (1 << 31) - 1);
+ ByteUtil.setFourBytes(fixedPayload, 4,
Http2Error.NO_ERROR.getCode());
+ byte[] payloadLength = new byte[3];
+ ByteUtil.setThreeBytes(payloadLength, 0, 8);
+
+ /*try {
+ synchronized (socketWrapper) {
+ socketWrapper.write(true, payloadLength, 0,
payloadLength.length);
+ socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
+ socketWrapper.write(true, fixedPayload, 0, 8);
+ socketWrapper.flush(true);
+ }
+ } catch (IOException ioe) {
+ // This is fatal for the connection. Ignore it here. There
will be
+ // further attempts at I/O in upgradeDispatch() and it can
better
+ // handle the IO errors.
+ }*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY),
ByteBuffer.wrap(fixedPayload));
+ }
+ }
+
+
+ @Override
+ public void destroy() {
+ // NO-OP
+ }
+
+
+ private void checkPauseState() throws IOException {
+ if (connectionState.get() == ConnectionState.PAUSING) {
+ if (pausedNanoTime + pingManager.getRoundTripTimeNano() <
System.nanoTime()) {
+ connectionState.compareAndSet(ConnectionState.PAUSING,
ConnectionState.PAUSED);
+
+ // Write a GOAWAY frame.
+ byte[] fixedPayload = new byte[8];
+ ByteUtil.set31Bits(fixedPayload, 0, maxProcessedStreamId);
+ ByteUtil.setFourBytes(fixedPayload, 4,
Http2Error.NO_ERROR.getCode());
+ byte[] payloadLength = new byte[3];
+ ByteUtil.setThreeBytes(payloadLength, 0, 8);
+
+ /*synchronized (socketWrapper) {
+ socketWrapper.write(true, payloadLength, 0,
payloadLength.length);
+ socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
+ socketWrapper.write(true, fixedPayload, 0, 8);
+ socketWrapper.flush(true);
+ }*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(payloadLength),
ByteBuffer.wrap(GOAWAY), ByteBuffer.wrap(fixedPayload));
+
+ }
+ }
+ }
+
+
+ private void closeStream(StreamException se) throws ConnectionException,
IOException {
+
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.rst.debug", connectionId,
+ Integer.toString(se.getStreamId()), se.getError()));
+ }
+
+ Stream stream = getStream(se.getStreamId(), false);
+ if (stream != null) {
+ stream.sendRst();
+ }
+
+ // Write a RST frame
+ byte[] rstFrame = new byte[13];
+ // Length
+ ByteUtil.setThreeBytes(rstFrame, 0, 4);
+ // Type
+ rstFrame[3] = FrameType.RST.getIdByte();
+ // No flags
+ // Stream ID
+ ByteUtil.set31Bits(rstFrame, 5, se.getStreamId());
+ // Payload
+ ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode());
+
+ /*synchronized (socketWrapper) {
+ socketWrapper.write(true, rstFrame, 0, rstFrame.length);
+ socketWrapper.flush(true);
+ }*/
+ socketWrapper.write(true, getWriteTimeout(), TimeUnit.MILLISECONDS,
null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(rstFrame));
+ }
+
+
+ private void closeConnection(Http2Exception ce) {
+ // Write a GOAWAY frame.
+ byte[] fixedPayload = new byte[8];
+ ByteUtil.set31Bits(fixedPayload, 0, maxProcessedStreamId);
+ ByteUtil.setFourBytes(fixedPayload, 4, ce.getError().getCode());
+ byte[] debugMessage = ce.getMessage().getBytes(StandardCharsets.UTF_8);
+ byte[] payloadLength = new byte[3];
+ ByteUtil.setThreeBytes(payloadLength, 0, debugMessage.length + 8);
+
+ /*try {
+ synchronized (socketWrapper) {
+ socketWrapper.write(true, payloadLength, 0,
payloadLength.length);
+ socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
+ socketWrapper.write(true, fixedPayload, 0, 8);
+ socketWrapper.write(true, debugMessage, 0,
debugMessage.length);
+ socketWrapper.flush(true);
+ }
+ } catch (IOException ioe) {
+ // Ignore. GOAWAY is sent on a best efforts basis and the original
+ // error has already been logged.
+ }*/
+ socketWrapper.write(true, getWriteTimeout(), TimeUnit.MILLISECONDS,
null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY),
ByteBuffer.wrap(fixedPayload), ByteBuffer.wrap(debugMessage));
+ close();
+ }
+
+
+ void writeHeaders(Stream stream, Response coyoteResponse) throws
IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId,
+ stream.getIdentifier()));
+ }
+ MimeHeaders headers = coyoteResponse.getMimeHeaders();
+ // Add the pseudo header for status
+
headers.addValue(":status").setString(Integer.toString(coyoteResponse.getStatus()));
+ // This ensures the Stream processing thread has control of the socket.
+ synchronized (socketWrapper) {
+ // Frame sizes are allowed to be bigger than 4k but for headers
that
+ // should be plenty
+ byte[] header = new byte[9];
+ ByteBuffer target = ByteBuffer.allocate(4 * 1024);
+ boolean first = true;
+ State state = null;
+ while (state != State.COMPLETE) {
+ state =
getHpackEncoder().encode(coyoteResponse.getMimeHeaders(), target);
+ target.flip();
+ ByteUtil.setThreeBytes(header, 0, target.limit());
+ if (first) {
+ first = false;
+ header[3] = FrameType.HEADERS.getIdByte();
+ if (stream.getOutputBuffer().hasNoBody()) {
+ header[4] = FLAG_END_OF_STREAM;
+ }
+ } else {
+ header[3] = FrameType.CONTINUATION.getIdByte();
+ }
+ if (state == State.COMPLETE) {
+ header[4] += FLAG_END_OF_HEADERS;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(target.limit() + " bytes");
+ }
+ ByteUtil.set31Bits(header, 5,
stream.getIdentifier().intValue());
+ /*socketWrapper.write(true, header, 0, header.length);
+ socketWrapper.write(true, target.array(),
target.arrayOffset(), target.limit());
+ socketWrapper.flush(true);*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(header), target);
+ }
+ }
+ }
+
+
+ private HpackEncoder getHpackEncoder() {
+ if (hpackEncoder == null) {
+ hpackEncoder = new
HpackEncoder(localSettings.getHeaderTableSize());
+ }
+ return hpackEncoder;
+ }
+
+
+ void writeBody(Stream stream, ByteBuffer data, int len, boolean finished)
throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.writeBody", connectionId,
stream.getIdentifier(),
+ Integer.toString(len)));
+ }
+ synchronized (socketWrapper) {
+ byte[] header = new byte[9];
+ ByteUtil.setThreeBytes(header, 0, len);
+ header[3] = FrameType.DATA.getIdByte();
+ if (finished) {
+ header[4] = FLAG_END_OF_STREAM;
+ stream.sentEndOfStream();
+ if (!stream.isActive()) {
+ activeRemoteStreamCount.decrementAndGet();
+ }
+ }
+ ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
+ /*socketWrapper.write(true, header, 0, header.length);
+ socketWrapper.write(true, data.array(), data.arrayOffset() +
data.position(),
+ len);
+ socketWrapper.flush(true);*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(header), ByteBuffer.wrap(data.array(),
data.arrayOffset() + data.position(), len));
+ }
+ }
+
+
+ void writeWindowUpdate(Stream stream, int increment) throws IOException {
+ synchronized (socketWrapper) {
+ // Build window update frame for stream 0
+ byte[] frame = new byte[13];
+ ByteUtil.setThreeBytes(frame, 0, 4);
+ frame[3] = FrameType.WINDOW_UPDATE.getIdByte();
+ ByteUtil.set31Bits(frame, 9, increment);
+ //socketWrapper.write(true, frame, 0, frame.length);
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(frame));
+ // Change stream Id and re-use
+ ByteUtil.set31Bits(frame, 5, stream.getIdentifier().intValue());
+ /*socketWrapper.write(true, frame, 0, frame.length);
+ socketWrapper.flush(true);*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(frame));
+ }
+ }
+
+
+ private void processWrites() throws IOException {
+ synchronized (socketWrapper) {
+ if (socketWrapper.flush(false)) {
+ socketWrapper.registerWriteInterest();
+ return;
+ }
+ }
+ }
+
+
+ int reserveWindowSize(Stream stream, int reservation) throws IOException {
+ // Need to be holding the stream lock so releaseBacklog() can't notify
+ // this thread until after this thread enters wait()
+ int allocation = 0;
+ synchronized (stream) {
+ do {
+ synchronized (this) {
+ long windowSize = getWindowSize();
+ if (windowSize < 1 || backLogSize > 0) {
+ // Has this stream been granted an allocation
+ int[] value = backLogStreams.remove(stream);
+ if (value != null && value[1] > 0) {
+ allocation = value[1];
+ decrementWindowSize(allocation);
+ } else {
+ value = new int[] { reservation, 0 };
+ backLogStreams.put(stream, value);
+ backLogSize += reservation;
+ // Add the parents as well
+ AbstractStream parent = stream.getParentStream();
+ while (parent != null &&
backLogStreams.putIfAbsent(parent, new int[2]) == null) {
+ parent = parent.getParentStream();
+ }
+ }
+ } else if (windowSize < reservation) {
+ allocation = (int) windowSize;
+ decrementWindowSize(allocation);
+ } else {
+ allocation = reservation;
+ decrementWindowSize(allocation);
+ }
+ }
+ if (allocation == 0) {
+ try {
+ stream.wait();
+ } catch (InterruptedException e) {
+ throw new IOException(sm.getString(
+
"upgradeHandler.windowSizeReservationInterrupted", connectionId,
+ stream.getIdentifier(),
Integer.toString(reservation)), e);
+ }
+ }
+ } while (allocation == 0);
+ }
+ return allocation;
+ }
+
+
+
+ @Override
+ protected synchronized void incrementWindowSize(int increment) throws
Http2Exception {
+ long windowSize = getWindowSize();
+ if (windowSize < 1 && windowSize + increment > 0) {
+ releaseBackLog(increment);
+ }
+ super.incrementWindowSize(increment);
+ }
+
+
+ private synchronized void releaseBackLog(int increment) {
+ if (backLogSize < increment) {
+ // Can clear the whole backlog
+ for (AbstractStream stream : backLogStreams.keySet()) {
+ synchronized (stream) {
+ stream.notifyAll();
+ }
+ }
+ backLogStreams.clear();
+ backLogSize = 0;
+ } else {
+ int leftToAllocate = increment;
+ while (leftToAllocate > 0) {
+ leftToAllocate = allocate(this, leftToAllocate);
+ }
+ for (Entry<AbstractStream,int[]> entry :
backLogStreams.entrySet()) {
+ int allocation = entry.getValue()[1];
+ if (allocation > 0) {
+ backLogSize -= allocation;
+ synchronized (entry.getKey()) {
+ entry.getKey().notifyAll();
+ }
+ }
+ }
+ }
+ }
+
+
+ private int allocate(AbstractStream stream, int allocation) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.allocate.debug",
getConnectionId(),
+ stream.getIdentifier(), Integer.toString(allocation)));
+ }
+ // Allocate to the specified stream
+ int[] value = backLogStreams.get(stream);
+ if (value[0] >= allocation) {
+ value[0] -= allocation;
+ value[1] = allocation;
+ return 0;
+ }
+
+ // There was some left over so allocate that to the children of the
+ // stream.
+ int leftToAllocate = allocation;
+ value[1] = value[0];
+ value[0] = 0;
+ leftToAllocate -= value[1];
+
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.allocate.left",
+ getConnectionId(), stream.getIdentifier(),
Integer.toString(leftToAllocate)));
+ }
+
+ // Recipients are children of the current stream that are in the
+ // backlog.
+ Set<AbstractStream> recipients = new HashSet<>();
+ recipients.addAll(stream.getChildStreams());
+ recipients.retainAll(backLogStreams.keySet());
+
+ // Loop until we run out of allocation or recipients
+ while (leftToAllocate > 0) {
+ if (recipients.size() == 0) {
+ backLogStreams.remove(stream);
+ return leftToAllocate;
+ }
+
+ int totalWeight = 0;
+ for (AbstractStream recipient : recipients) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.allocate.recipient",
+ getConnectionId(), stream.getIdentifier(),
recipient.getIdentifier(),
+ Integer.toString(recipient.getWeight())));
+ }
+ totalWeight += recipient.getWeight();
+ }
+
+ // Use an Iterator so fully allocated children/recipients can be
+ // removed.
+ Iterator<AbstractStream> iter = recipients.iterator();
+ int allocated = 0;
+ while (iter.hasNext()) {
+ AbstractStream recipient = iter.next();
+ int share = leftToAllocate * recipient.getWeight() /
totalWeight;
+ if (share == 0) {
+ // This is to avoid rounding issues triggering an infinite
+ // loop. It will cause a very slight over allocation but
+ // HTTP/2 should cope with that.
+ share = 1;
+ }
+ int remainder = allocate(recipient, share);
+ // Remove recipients that receive their full allocation so that
+ // they are excluded from the next allocation round.
+ if (remainder > 0) {
+ iter.remove();
+ }
+ allocated += (share - remainder);
+ }
+ leftToAllocate -= allocated;
+ }
+
+ return 0;
+ }
+
+
+ private Stream getStream(int streamId, boolean unknownIsError) throws
ConnectionException {
+ Integer key = Integer.valueOf(streamId);
+ Stream result = streams.get(key);
+ if (result == null && unknownIsError) {
+ // Stream has been closed and removed from the map
+ throw new
ConnectionException(sm.getString("upgradeHandler.stream.closed", key),
+ Http2Error.PROTOCOL_ERROR);
+ }
+ return result;
+ }
+
+
+ private Stream createRemoteStream(int streamId) throws ConnectionException
{
+ Integer key = Integer.valueOf(streamId);
+
+ if (streamId %2 != 1) {
+ throw new ConnectionException(
+ sm.getString("upgradeHandler.stream.even", key),
Http2Error.PROTOCOL_ERROR);
+ }
+
+ if (streamId <= maxRemoteStreamId) {
+ throw new
ConnectionException(sm.getString("upgradeHandler.stream.old", key,
+ Integer.valueOf(maxRemoteStreamId)),
Http2Error.PROTOCOL_ERROR);
+ }
+
+ pruneClosedStreams();
+
+ Stream result = new Stream(key, this);
+ streams.put(key, result);
+ maxRemoteStreamId = streamId;
+ return result;
+ }
+
+
+ private void close() {
+ connectionState.set(ConnectionState.CLOSED);
+ try {
+ socketWrapper.close();
+ } catch (IOException ioe) {
+ log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
+ }
+ }
+
+
+ private void pruneClosedStreams() {
+ // Only prune every 10 new streams
+ if (newStreamsSinceLastPrune < 9) {
+ newStreamsSinceLastPrune++;
+ return;
+ }
+ // Reset counter
+ newStreamsSinceLastPrune = 0;
+
+ // RFC 7540, 5.3.4 endpoints should maintain state for at least the
+ // maximum number of concurrent streams
+ long max = localSettings.getMaxConcurrentStreams();
+
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.pruneStart", connectionId,
+ Long.toString(max), Integer.toString(streams.size())));
+ }
+
+ // Allow an additional 10% for closed streams that are used in the
+ // priority tree
+ max = max + max / 10;
+ if (max > Integer.MAX_VALUE) {
+ max = Integer.MAX_VALUE;
+ }
+
+ int toClose = streams.size() - (int) max;
+ if (toClose < 1) {
+ return;
+ }
+
+ // Need to try and close some streams.
+ // Use this Set to keep track of streams that might be part of the
+ // priority tree. Only remove these if we absolutely have to.
+ TreeSet<Integer> additionalCandidates = new TreeSet<>();
+
+ Iterator<Entry<Integer,Stream>> entryIter =
streams.entrySet().iterator();
+ while (entryIter.hasNext() && toClose > 0) {
+ Entry<Integer,Stream> entry = entryIter.next();
+ Stream stream = entry.getValue();
+ // Never remove active streams or streams with children
+ if (stream.isActive() || stream.getChildStreams().size() > 0) {
+ continue;
+ }
+ if (stream.isClosedFinal()) {
+ // This stream went from IDLE to CLOSED and is likely to have
+ // been created by the client as part of the priority tree.
Keep
+ // it if possible.
+ additionalCandidates.add(entry.getKey());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.pruned",
connectionId, entry.getKey()));
+ }
+ entryIter.remove();
+ toClose--;
+ }
+ }
+
+ while (toClose > 0 && additionalCandidates.size() > 0) {
+ Integer pruned = additionalCandidates.pollLast();
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.prunedPriority",
connectionId, pruned));
+ }
+ toClose++;
+ }
+
+ if (toClose > 0) {
+ log.warn(sm.getString("upgradeHandler.pruneIncomplete",
connectionId,
+ Integer.toString(toClose)));
+ }
+ }
+
+
+ String getProperty(String key) {
+ return socketWrapper.getEndpoint().getProperty(key);
+ }
+
+
+ @Override
+ protected final String getConnectionId() {
+ return connectionId;
+ }
+
+
+ @Override
+ protected final int getWeight() {
+ return 0;
+ }
+
+
+ // ------------------------------------------- Configuration
getters/setters
+
+ public long getReadTimeout() {
+ return readTimeout;
+ }
+
+
+ public void setReadTimeout(long readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
+
+ public long getKeepAliveTimeout() {
+ return keepAliveTimeout;
+ }
+
+
+ public void setKeepAliveTimeout(long keepAliveTimeout) {
+ this.keepAliveTimeout = keepAliveTimeout;
+ }
+
+
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+
+ public void setMaxConcurrentStreams(long maxConcurrentStreams) {
+ localSettings.set(Setting.MAX_CONCURRENT_STREAMS,
maxConcurrentStreams);
+ }
+
+
+ public void setInitialWindowSize(int initialWindowSize) {
+ localSettings.set(Setting.INITIAL_WINDOW_SIZE, initialWindowSize);
+ }
+
+
+ // ----------------------------------------------- Http2Parser.Input
methods
+
+ @Override
+ public boolean fill(boolean block, byte[] data, int offset, int length)
throws IOException {
+ int len = length;
+ int pos = offset;
+ boolean nextReadBlock = block;
+ int thisRead = 0;
+
+ while (len > 0) {
+ thisRead = socketWrapper.read(nextReadBlock, data, pos, len);
+ if (thisRead == 0) {
+ if (nextReadBlock) {
+ // Should never happen
+ throw new IllegalStateException();
+ } else {
+ return false;
+ }
+ } else if (thisRead == -1) {
+ throw new EOFException();
+ } else {
+ pos += thisRead;
+ len -= thisRead;
+ nextReadBlock = true;
+ }
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int getMaxFrameSize() {
+ return localSettings.getMaxFrameSize();
+ }
+
+
+ // ---------------------------------------------- Http2Parser.Output
methods
+
+ @Override
+ public HpackDecoder getHpackDecoder() {
+ if (hpackDecoder == null) {
+ hpackDecoder = new
HpackDecoder(remoteSettings.getHeaderTableSize());
+ }
+ return hpackDecoder;
+ }
+
+
+ @Override
+ public ByteBuffer getInputByteBuffer(int streamId, int payloadSize) throws
Http2Exception {
+ Stream stream = getStream(streamId, true);
+ stream.checkState(FrameType.DATA);
+ return stream.getInputByteBuffer();
+ }
+
+
+ @Override
+ public void receiveEndOfStream(int streamId) throws ConnectionException {
+ Stream stream = getStream(streamId,
connectionState.get().isNewStreamAllowed());
+ if (stream != null) {
+ stream.receivedEndOfStream();
+ if (!stream.isActive()) {
+ activeRemoteStreamCount.decrementAndGet();
+ }
+ }
+ }
+
+
+ @Override
+ public void swallowedPadding(int streamId, int paddingLength) throws
+ ConnectionException, IOException {
+ Stream stream = getStream(streamId, true);
+ // +1 is for the payload byte used to define the padding length
+ writeWindowUpdate(stream, paddingLength + 1);
+ }
+
+
+ @Override
+ public HeaderEmitter headersStart(int streamId) throws Http2Exception {
+ if (connectionState.get().isNewStreamAllowed()) {
+ Stream stream = getStream(streamId, false);
+ if (stream == null) {
+ stream = createRemoteStream(streamId);
+ }
+ stream.checkState(FrameType.HEADERS);
+ stream.receivedStartOfHeaders();
+ closeIdleStreams(streamId);
+ if (localSettings.getMaxConcurrentStreams() <
activeRemoteStreamCount.incrementAndGet()) {
+ activeRemoteStreamCount.decrementAndGet();
+ throw new
StreamException(sm.getString("upgradeHandler.tooManyRemoteStreams",
+
Long.toString(localSettings.getMaxConcurrentStreams())),
+ Http2Error.REFUSED_STREAM, streamId);
+ }
+ return stream;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.noNewStreams",
+ connectionId, Integer.toString(streamId)));
+ }
+ // Stateless so a static can be used to save on GC
+ return HEADER_SINK;
+ }
+ }
+
+
+ private void closeIdleStreams(int newMaxActiveRemoteStreamId) throws
Http2Exception {
+ for (int i = maxActiveRemoteStreamId + 2; i <
newMaxActiveRemoteStreamId; i += 2) {
+ Stream stream = getStream(i, false);
+ if (stream != null) {
+ stream.closeIfIdle();
+ }
+ }
+ maxActiveRemoteStreamId = newMaxActiveRemoteStreamId;
+ }
+
+
+ @Override
+ public void reprioritise(int streamId, int parentStreamId,
+ boolean exclusive, int weight) throws Http2Exception {
+ Stream stream = getStream(streamId, false);
+ if (stream == null) {
+ stream = createRemoteStream(streamId);
+ }
+ stream.checkState(FrameType.PRIORITY);
+ AbstractStream parentStream = getStream(parentStreamId, false);
+ if (parentStream == null) {
+ parentStream = this;
+ }
+ stream.rePrioritise(parentStream, exclusive, weight);
+ }
+
+
+ @Override
+ public void headersEnd(int streamId) throws ConnectionException {
+ setMaxProcessedStream(streamId);
+ Stream stream = getStream(streamId,
connectionState.get().isNewStreamAllowed());
+ if (stream != null) {
+ // Process this stream on a container thread
+ StreamProcessor streamProcessor = new StreamProcessor(stream,
adapter, socketWrapper);
+ streamProcessor.setSslSupport(sslSupport);
+ socketWrapper.getEndpoint().getExecutor().execute(streamProcessor);
+ }
+ }
+
+
+ private void setMaxProcessedStream(int streamId) {
+ if (maxProcessedStreamId < streamId) {
+ maxProcessedStreamId = streamId;
+ }
+ }
+
+
+ @Override
+ public void reset(int streamId, long errorCode) throws Http2Exception {
+ Stream stream = getStream(streamId, true);
+ stream.checkState(FrameType.RST);
+ stream.reset(errorCode);
+ }
+
+
+ @Override
+ public void setting(Setting setting, long value) throws
ConnectionException {
+ // Special handling required
+ if (setting == Setting.INITIAL_WINDOW_SIZE) {
+ long oldValue = remoteSettings.getInitialWindowSize();
+ // Do this first in case new value is invalid
+ remoteSettings.set(setting, value);
+ int diff = (int) (value - oldValue);
+ for (Stream stream : streams.values()) {
+ try {
+ stream.incrementWindowSize(diff);
+ } catch (Http2Exception h2e) {
+ try {
+ closeStream(new StreamException(sm.getString(
+ "upgradeHandler.windowSizeTooBig",
connectionId,
+ stream.getIdentifier()),
+ h2e.getError(),
stream.getIdentifier().intValue()));
+ } catch (IOException ioe) {
+ if (log.isDebugEnabled()) {
+
log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
+ }
+ }
+ }
+ }
+ } else {
+ remoteSettings.set(setting, value);
+ }
+ }
+
+
+ @Override
+ public void settingsEnd(boolean ack) throws IOException {
+ if (ack) {
+ if (!localSettings.ack()) {
+ // Ack was unexpected
+ log.warn(sm.getString(
+ "upgradeHandler.unexpectedAck", connectionId,
getIdentifier()));
+ }
+ } else {
+ /*synchronized (socketWrapper) {
+ socketWrapper.write(true, SETTINGS_ACK, 0,
SETTINGS_ACK.length);
+ socketWrapper.flush(true);
+ }*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(SETTINGS_ACK));
+ }
+ }
+
+
+ @Override
+ public void pingReceive(byte[] payload, boolean ack) throws IOException {
+ pingManager.receivePing(payload, ack);
+ }
+
+
+ @Override
+ public void goaway(int lastStreamId, long errorCode, String debugData) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("upgradeHandler.goaway.debug", connectionId,
+ Integer.toString(lastStreamId),
Long.toHexString(errorCode), debugData));
+ }
+ }
+
+
+ @Override
+ public void incrementWindowSize(int streamId, int increment) throws
Http2Exception {
+ if (streamId == 0) {
+ incrementWindowSize(increment);
+ } else {
+ Stream stream = getStream(streamId, true);
+ stream.checkState(FrameType.WINDOW_UPDATE);
+ stream.incrementWindowSize(increment);
+ }
+ }
+
+
+ @Override
+ public void swallowed(int streamId, FrameType frameType, int flags, int
size)
+ throws IOException {
+ // NO-OP.
+ }
+
+
+ private class PingManager {
+
+ // 10 seconds
+ private final long pingIntervalNano = 10000000000L;
+
+ private int sequence = 0;
+ private long lastPingNanoTime = Long.MIN_VALUE;
+
+ private Queue<PingRecord> inflightPings = new
ConcurrentLinkedQueue<>();
+ private Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Check to see if a ping was sent recently and, if not, send one.
+ *
+ * @param force Send a ping, even if one was sent recently
+ *
+ * @throws IOException If an I/O issue prevents the ping from being
sent
+ */
+ public void sendPing(boolean force) throws IOException {
+ long now = System.nanoTime();
+ if (force || now - lastPingNanoTime > pingIntervalNano) {
+ lastPingNanoTime = now;
+ byte[] payload = new byte[8];
+ synchronized (socketWrapper) {
+ int sentSequence = ++sequence;
+ PingRecord pingRecord = new PingRecord(sentSequence, now);
+ inflightPings.add(pingRecord);
+ ByteUtil.set31Bits(payload, 4, sentSequence);
+ /*socketWrapper.write(true, PING, 0, PING.length);
+ socketWrapper.write(true, payload, 0, payload.length);
+ socketWrapper.flush(true);*/
+ }
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(PING), ByteBuffer.wrap(payload));
+ }
+ }
+
+ public void receivePing(byte[] payload, boolean ack) throws
IOException {
+ if (ack) {
+ // Extract the sequence from the payload
+ int receivedSequence = ByteUtil.get31Bits(payload, 4);
+ PingRecord pingRecord = inflightPings.poll();
+ while (pingRecord != null && pingRecord.getSequence() <
receivedSequence) {
+ pingRecord = inflightPings.poll();
+ }
+ if (pingRecord == null) {
+ // Unexpected ACK. Log it.
+ } else {
+ long roundTripTime = System.nanoTime() -
pingRecord.getSentNanoTime();
+ roundTripTimes.add(Long.valueOf(roundTripTime));
+ while (roundTripTimes.size() > 3) {
+ roundTripTimes.poll();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("pingManager.roundTripTime",
+ connectionId, Long.valueOf(roundTripTime)));
+ }
+ }
+
+ } else {
+ // Client originated ping. Echo it back.
+ /*synchronized (socketWrapper) {
+ socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
+ socketWrapper.write(true, payload, 0, payload.length);
+ socketWrapper.flush(true);
+ }*/
+ socketWrapper.write(true, getWriteTimeout(),
TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE, null,
+ ByteBuffer.wrap(PING_ACK), ByteBuffer.wrap(payload));
+ }
+ }
+
+ public long getRoundTripTimeNano() {
+ return (long) roundTripTimes.stream().mapToLong(x ->
x.longValue()).average().orElse(0);
+ }
+ }
+
+
+ private static class PingRecord {
+
+ private final int sequence;
+ private final long sentNanoTime;
+
+ public PingRecord(int sequence, long sentNanoTime) {
+ this.sequence = sequence;
+ this.sentNanoTime = sentNanoTime;
+ }
+
+ public int getSequence() {
+ return sequence;
+ }
+
+ public long getSentNanoTime() {
+ return sentNanoTime;
+ }
+ }
+
+
+ private enum ConnectionState {
+
+ NEW(true),
+ CONNECTED(true),
+ PAUSING(true),
+ PAUSED(false),
+ CLOSED(false);
+
+ private final boolean newStreamsAllowed;
+
+ private ConnectionState(boolean newStreamsAllowed) {
+ this.newStreamsAllowed = newStreamsAllowed;
+ }
+
+ public boolean isNewStreamAllowed() {
+ return newStreamsAllowed;
+ }
+ }
+}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java Thu Nov 5
17:29:20 2015
@@ -23,6 +23,7 @@ import org.apache.coyote.Adapter;
import org.apache.coyote.Processor;
import org.apache.coyote.Request;
import org.apache.coyote.UpgradeProtocol;
+import org.apache.coyote.UpgradeToken;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -72,7 +73,7 @@ public class Http2Protocol implements Up
@Override
public Processor getProcessor(SocketWrapperBase<?> socketWrapper, Adapter
adapter) {
UpgradeProcessorInternal processor = new
UpgradeProcessorInternal(socketWrapper, null,
- getInternalUpgradeHandler(adapter, null));
+ new UpgradeToken(getInternalUpgradeHandler(adapter, null),
Http2Protocol.class.getClassLoader()));
return processor;
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Thu Nov 5
17:29:20 2015
@@ -20,8 +20,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.servlet.http.HttpUpgradeHandler;
-
import org.apache.coyote.AbstractProcessor;
import org.apache.coyote.ActionCode;
import org.apache.coyote.Adapter;
@@ -29,6 +27,7 @@ import org.apache.coyote.AsyncContextCal
import org.apache.coyote.ContainerThreadMarker;
import org.apache.coyote.ErrorState;
import org.apache.coyote.Request;
+import org.apache.coyote.UpgradeToken;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.buf.ByteChunk;
@@ -445,7 +444,7 @@ public class StreamProcessor extends Abs
@Override
- public HttpUpgradeHandler getHttpUpgradeHandler() {
+ public UpgradeToken getUpgradeToken() {
// Should never happen
throw new
IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported"));
}
Added: tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java?rev=1712826&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java Thu Nov 5
17:29:20 2015
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class InstanceManagerBindings {
+
+ private static final ConcurrentHashMap<ClassLoader, InstanceManager>
bindings =
+ new ConcurrentHashMap<>();
+
+ public static final void bind(ClassLoader classLoader, InstanceManager
instanceManager) {
+ bindings.put(classLoader, instanceManager);
+ }
+ public static final void unbind(ClassLoader classLoader) {
+ bindings.remove(classLoader);
+ }
+ public static final InstanceManager get(ClassLoader classLoader) {
+ return bindings.get(classLoader);
+ }
+
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/InstanceManagerBindings.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 5
17:29:20 2015
@@ -48,6 +48,7 @@ import javax.websocket.WebSocketContaine
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.InstanceManager;
+import org.apache.tomcat.InstanceManagerBindings;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.res.StringManager;
@@ -181,6 +182,9 @@ public class WsSession implements Sessio
this.id = Long.toHexString(ids.getAndIncrement());
InstanceManager instanceManager =
webSocketContainer.getInstanceManager();
+ if (instanceManager == null) {
+ instanceManager =
InstanceManagerBindings.get(applicationClassLoader);
+ }
if (instanceManager != null) {
try {
instanceManager.newInstance(localEndpoint);
@@ -535,6 +539,9 @@ public class WsSession implements Sessio
t.setContextClassLoader(applicationClassLoader);
try {
localEndpoint.onClose(this, closeReason);
+ if (instanceManager == null) {
+ instanceManager =
InstanceManagerBindings.get(applicationClassLoader);
+ }
if (instanceManager != null) {
instanceManager.destroyInstance(localEndpoint);
}
Modified: tomcat/trunk/res/checkstyle/org-import-control.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/res/checkstyle/org-import-control.xml?rev=1712826&r1=1712825&r2=1712826&view=diff
==============================================================================
--- tomcat/trunk/res/checkstyle/org-import-control.xml (original)
+++ tomcat/trunk/res/checkstyle/org-import-control.xml Thu Nov 5 17:29:20 2015
@@ -87,6 +87,7 @@
<allow pkg="javax.servlet"/>
<allow pkg="org.apache.coyote"/>
<allow pkg="org.apache.juli"/>
+ <allow pkg="org.apache.tomcat"/>
<allow pkg="org.apache.tomcat.jni"/>
<allow pkg="org.apache.tomcat.util"/>
</subpackage>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]