Author: remm
Date: Thu Mar  9 15:27:10 2017
New Revision: 1786186

URL: http://svn.apache.org/viewvc?rev=1786186&view=rev
Log:
- Pick up an old experiment with IO and NIO2. I decided to revisit it as the 
topic of TomcatCon came along.
- Start using it for HTTP/2 output. It seems to work (h2load, testsuite, 
browser).
- After testing it with h2load, it does provide a benefit over vanilla NIO2 
with only a limited amount of code, so contribute it.
- Add some plumbing to allow extending HTTP/2 upgrade handler to replace its 
IO, and a minor change to the API to pass the socket wrapper to determine the 
capabilities of the endpoint. Since it seems to work and NIO2 isn't the 
default, enable it with NIO2. Although I left it for now, it may be better to 
disable it with SSL (another reason to pass around the socket wrapper).
- Ultimately, the strategy is the exact opposite of Coyote (= zero GC), the 
protocol handler gets control of everything through the read/write API 
(syncing, blocking, etc) and encourages GC abuse to minimize blocking.
- If this is too big a change, I can revert it and discuss it first. Although 
it got discussed in 2015, it's a long time ago.

Added:
    tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java   
(with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java?rev=1786186&r1=1786185&r2=1786186&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/UpgradeProtocol.java Thu Mar  9 
15:27:10 2017
@@ -70,13 +70,14 @@ public interface UpgradeProtocol {
 
 
     /**
+     * @param socketWrapper The socket
      * @param adapter The Adapter to use to configure the new upgrade handler
      * @param request A copy (may be incomplete) of the request that triggered
      *                the upgrade
      *
      * @return An instance of the HTTP upgrade handler for this protocol
      */
-    public InternalHttpUpgradeHandler getInternalUpgradeHandler(Adapter 
adapter, Request request);
+    public InternalHttpUpgradeHandler 
getInternalUpgradeHandler(SocketWrapperBase<?> socketWrapper, Adapter adapter, 
Request request);
 
 
     /**

Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1786186&r1=1786185&r2=1786186&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Thu Mar  9 
15:27:10 2017
@@ -455,7 +455,7 @@ public class Http11Processor extends Abs
 
                         InternalHttpUpgradeHandler upgradeHandler =
                                 upgradeProtocol.getInternalUpgradeHandler(
-                                        getAdapter(), cloneRequest(request));
+                                        socketWrapper, getAdapter(), 
cloneRequest(request));
                         UpgradeToken upgradeToken = new 
UpgradeToken(upgradeHandler, null, null);
                         action(ActionCode.UPGRADE, upgradeToken);
                         return SocketState.UPGRADING;

Added: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1786186&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java 
(added)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Thu 
Mar  9 15:27:10 2017
@@ -0,0 +1,341 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.coyote.Adapter;
+import org.apache.coyote.ProtocolException;
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.http2.HpackEncoder.State;
+import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
+
+public class Http2AsyncUpgradeHandler extends Http2UpgradeHandler {
+
+    private static final ByteBuffer[] BYTEBUFFER_ARRAY = new ByteBuffer[0];
+    private Throwable error = null;
+    private IOException applicationIOE = null;
+
+    public Http2AsyncUpgradeHandler(Adapter adapter, Request coyoteRequest) {
+        super (adapter, coyoteRequest);
+    }
+
+    private CompletionHandler<Long, Void> errorCompletion = new 
CompletionHandler<Long, Void>() {
+        @Override
+        public void completed(Long result, Void attachment) {
+        }
+        @Override
+        public void failed(Throwable t, Void attachment) {
+            error = t;
+        }
+    };
+    private CompletionHandler<Long, Void> applicationErrorCompletion = new 
CompletionHandler<Long, Void>() {
+        @Override
+        public void completed(Long result, Void attachment) {
+        }
+        @Override
+        public void failed(Throwable t, Void attachment) {
+            if (t instanceof IOException) {
+                applicationIOE = (IOException) t;
+            }
+            error = t;
+        }
+    };
+
+    @Override
+    protected PingManager getPingManager() {
+        return new AsyncPingManager();
+    }
+
+    @Override
+    protected void writeSettings() {
+        // Send the initial settings frame
+        socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                ByteBuffer.wrap(localSettings.getSettingsFrameForPending()));
+        if (error != null) {
+            String msg = sm.getString("upgradeHandler.sendPrefaceFail", 
connectionId);
+            if (log.isDebugEnabled()) {
+                log.debug(msg);
+            }
+            throw new ProtocolException(msg, error);
+        }
+    }
+
+
+    @Override
+    void sendStreamReset(StreamException se) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.rst.debug", connectionId,
+                    Integer.toString(se.getStreamId()), se.getError()));
+        }
+        // Write a RST frame
+        byte[] rstFrame = new byte[13];
+        // Length
+        ByteUtil.setThreeBytes(rstFrame, 0, 4);
+        // Type
+        rstFrame[3] = FrameType.RST.getIdByte();
+        // No flags
+        // Stream ID
+        ByteUtil.set31Bits(rstFrame, 5, se.getStreamId());
+        // Payload
+        ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode());
+        socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                ByteBuffer.wrap(rstFrame));
+        handleAsyncException();
+    }
+
+
+    @Override
+    protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] 
debugMsg)
+            throws IOException {
+        byte[] fixedPayload = new byte[8];
+        ByteUtil.set31Bits(fixedPayload, 0, maxStreamId);
+        ByteUtil.setFourBytes(fixedPayload, 4, errorCode);
+        int len = 8;
+        if (debugMsg != null) {
+            len += debugMsg.length;
+        }
+        byte[] payloadLength = new byte[3];
+        ByteUtil.setThreeBytes(payloadLength, 0, len);
+        if (debugMsg != null) {
+            socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                    null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                    ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY), 
ByteBuffer.wrap(fixedPayload), ByteBuffer.wrap(debugMsg));
+        } else {
+            socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                    null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                    ByteBuffer.wrap(payloadLength), ByteBuffer.wrap(GOAWAY), 
ByteBuffer.wrap(fixedPayload));
+        }
+        handleAsyncException();
+    }
+
+
+    @Override
+    void writeHeaders(Stream stream, Response coyoteResponse, int payloadSize)
+            throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId,
+                    stream.getIdentifier()));
+        }
+
+        if (!stream.canWrite()) {
+            return;
+        }
+
+        prepareHeaders(coyoteResponse);
+
+        boolean first = true;
+        State state = null;
+        ArrayList<ByteBuffer> bufs = new ArrayList<>();
+        // This ensures the Stream processing thread has control of the socket.
+        while (state != State.COMPLETE) {
+            byte[] header = new byte[9];
+            ByteBuffer target = ByteBuffer.allocate(payloadSize);
+            state = getHpackEncoder().encode(coyoteResponse.getMimeHeaders(), 
target);
+            target.flip();
+            ByteUtil.setThreeBytes(header, 0, target.limit());
+            if (first) {
+                first = false;
+                header[3] = FrameType.HEADERS.getIdByte();
+                if (stream.getOutputBuffer().hasNoBody()) {
+                    header[4] = FLAG_END_OF_STREAM;
+                }
+            } else {
+                header[3] = FrameType.CONTINUATION.getIdByte();
+            }
+            if (state == State.COMPLETE) {
+                header[4] += FLAG_END_OF_HEADERS;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug(target.limit() + " bytes");
+            }
+            ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
+            bufs.add(ByteBuffer.wrap(header));
+            bufs.add(target);
+        }
+        socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                null, SocketWrapperBase.COMPLETE_WRITE, 
applicationErrorCompletion,
+                bufs.toArray(BYTEBUFFER_ARRAY));
+        handleAsyncException();
+    }
+
+
+    @Override
+    protected void writePushHeaders(Stream stream, int pushedStreamId, Request 
coyoteRequest, int payloadSize)
+            throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.writePushHeaders", 
connectionId,
+                    stream.getIdentifier(), Integer.toString(pushedStreamId)));
+        }
+        // This ensures the Stream processing thread has control of the socket.
+        boolean first = true;
+        State state = null;
+        ArrayList<ByteBuffer> bufs = new ArrayList<>();
+        byte[] pushedStreamIdBytes = new byte[4];
+        ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId);
+        while (state != State.COMPLETE) {
+            byte[] header = new byte[9];
+            ByteBuffer target = ByteBuffer.allocate(payloadSize);
+            target.put(pushedStreamIdBytes);
+            state = getHpackEncoder().encode(coyoteRequest.getMimeHeaders(), 
target);
+            target.flip();
+            ByteUtil.setThreeBytes(header, 0, target.limit());
+            if (first) {
+                first = false;
+                header[3] = FrameType.PUSH_PROMISE.getIdByte();
+            } else {
+                header[3] = FrameType.CONTINUATION.getIdByte();
+            }
+            if (state == State.COMPLETE) {
+                header[4] += FLAG_END_OF_HEADERS;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug(target.limit() + " bytes");
+            }
+            ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
+            bufs.add(ByteBuffer.wrap(header));
+            bufs.add(target);
+        }
+        socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                null, SocketWrapperBase.COMPLETE_WRITE, 
applicationErrorCompletion,
+                bufs.toArray(BYTEBUFFER_ARRAY));
+        handleAsyncException();
+    }
+
+
+    @Override
+    void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) 
throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.writeBody", connectionId, 
stream.getIdentifier(),
+                    Integer.toString(len)));
+        }
+        // Need to check this now since sending end of stream will change this.
+        boolean writeable = stream.canWrite();
+        byte[] header = new byte[9];
+        ByteUtil.setThreeBytes(header, 0, len);
+        header[3] = FrameType.DATA.getIdByte();
+        if (finished) {
+            header[4] = FLAG_END_OF_STREAM;
+            stream.sentEndOfStream();
+            if (!stream.isActive()) {
+                activeRemoteStreamCount.decrementAndGet();
+            }
+        }
+        if (writeable) {
+            ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
+            int orgLimit = data.limit();
+            data.limit(data.position() + len);
+            socketWrapper.write(BlockingMode.BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                    null, SocketWrapperBase.COMPLETE_WRITE, 
applicationErrorCompletion,
+                    ByteBuffer.wrap(header), data);
+            data.limit(orgLimit);
+            handleAsyncException();
+        }
+    }
+
+
+    @Override
+    void writeWindowUpdate(Stream stream, int increment, boolean 
applicationInitiated)
+            throws IOException {
+        if (!stream.canWrite()) {
+            return;
+        }
+        // Build window update frame for stream 0
+        byte[] frame = new byte[13];
+        ByteUtil.setThreeBytes(frame, 0,  4);
+        frame[3] = FrameType.WINDOW_UPDATE.getIdByte();
+        ByteUtil.set31Bits(frame, 9, increment);
+        // Change stream Id
+        byte[] frame2 = new byte[13];
+        ByteUtil.setThreeBytes(frame2, 0,  4);
+        frame2[3] = FrameType.WINDOW_UPDATE.getIdByte();
+        ByteUtil.set31Bits(frame2, 9, increment);
+        ByteUtil.set31Bits(frame2, 5, stream.getIdentifier().intValue());
+        socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                ByteBuffer.wrap(frame), ByteBuffer.wrap(frame2));
+        handleAsyncException();
+    }
+
+
+    @Override
+    public void settingsEnd(boolean ack) throws IOException {
+        if (ack) {
+            if (!localSettings.ack()) {
+                // Ack was unexpected
+                log.warn(sm.getString("upgradeHandler.unexpectedAck", 
connectionId, getIdentifier()));
+            }
+        } else {
+            socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), 
TimeUnit.MILLISECONDS,
+                    null, SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
+                    ByteBuffer.wrap(SETTINGS_ACK));
+        }
+        handleAsyncException();
+    }
+
+
+    protected void handleAsyncException() throws IOException {
+        if (applicationIOE != null) {
+            handleAppInitiatedIOException(applicationIOE);
+        } else if (error != null) {
+            throw new IOException(error);
+        }
+    }
+
+
+    protected class AsyncPingManager extends PingManager {
+        @Override
+        public void sendPing(boolean force) throws IOException {
+            long now = System.nanoTime();
+            if (force || now - lastPingNanoTime > pingIntervalNano) {
+                lastPingNanoTime = now;
+                byte[] payload = new byte[8];
+                int sentSequence = ++sequence;
+                PingRecord pingRecord = new PingRecord(sentSequence, now);
+                inflightPings.add(pingRecord);
+                ByteUtil.set31Bits(payload, 4, sentSequence);
+                socketWrapper.write(BlockingMode.SEMI_BLOCK, 
getWriteTimeout(), TimeUnit.MILLISECONDS,
+                        null, SocketWrapperBase.COMPLETE_WRITE, 
errorCompletion,
+                        ByteBuffer.wrap(PING), ByteBuffer.wrap(payload));
+                handleAsyncException();
+            }
+        }
+
+        @Override
+        public void receivePing(byte[] payload, boolean ack) throws 
IOException {
+            if (ack) {
+                super.receivePing(payload, ack);
+            } else {
+                // Client originated ping. Echo it back.
+                socketWrapper.write(BlockingMode.SEMI_BLOCK, 
getWriteTimeout(), TimeUnit.MILLISECONDS,
+                        null, SocketWrapperBase.COMPLETE_WRITE, 
errorCompletion,
+                        ByteBuffer.wrap(PING_ACK), ByteBuffer.wrap(payload));
+                handleAsyncException();
+            }
+        }
+
+    }
+
+}

Propchange: 
tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java?rev=1786186&r1=1786185&r2=1786186&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2Protocol.java Thu Mar  9 
15:27:10 2017
@@ -92,15 +92,17 @@ public class Http2Protocol implements Up
     @Override
     public Processor getProcessor(SocketWrapperBase<?> socketWrapper, Adapter 
adapter) {
         UpgradeProcessorInternal processor = new 
UpgradeProcessorInternal(socketWrapper,
-                new UpgradeToken(getInternalUpgradeHandler(adapter, null), 
null, null));
+                new UpgradeToken(getInternalUpgradeHandler(socketWrapper, 
adapter, null), null, null));
         return processor;
     }
 
 
     @Override
-    public InternalHttpUpgradeHandler getInternalUpgradeHandler(Adapter 
adapter,
-            Request coyoteRequest) {
-        Http2UpgradeHandler result = new Http2UpgradeHandler(adapter, 
coyoteRequest);
+    public InternalHttpUpgradeHandler 
getInternalUpgradeHandler(SocketWrapperBase<?> socketWrapper,
+            Adapter adapter, Request coyoteRequest) {
+        Http2UpgradeHandler result = (socketWrapper.hasAsyncIO())
+                ? new Http2AsyncUpgradeHandler(adapter, coyoteRequest)
+                : new Http2UpgradeHandler(adapter, coyoteRequest);
 
         result.setReadTimeout(getReadTimeout());
         result.setKeepAliveTimeout(getKeepAliveTimeout());

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1786186&r1=1786185&r2=1786186&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu Mar  
9 15:27:10 2017
@@ -76,36 +76,36 @@ import org.apache.tomcat.util.res.String
 class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeHandler,
         Input, Output {
 
-    private static final Log log = 
LogFactory.getLog(Http2UpgradeHandler.class);
-    private static final StringManager sm = 
StringManager.getManager(Http2UpgradeHandler.class);
+    protected static final Log log = 
LogFactory.getLog(Http2UpgradeHandler.class);
+    protected static final StringManager sm = 
StringManager.getManager(Http2UpgradeHandler.class);
 
     private static final AtomicInteger connectionIdGenerator = new 
AtomicInteger(0);
     private static final Integer STREAM_ID_ZERO = Integer.valueOf(0);
 
-    private static final int FLAG_END_OF_STREAM = 1;
-    private static final int FLAG_END_OF_HEADERS = 4;
+    protected static final int FLAG_END_OF_STREAM = 1;
+    protected static final int FLAG_END_OF_HEADERS = 4;
 
-    private static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 
0x00, 0x00, 0x00};
-    private static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 
0x00, 0x00, 0x00, 0x00 };
+    protected static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 
0x00, 0x00, 0x00};
+    protected static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 
0x00, 0x00, 0x00, 0x00 };
 
-    private static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 
0x00, 0x00, 0x00, 0x00 };
+    protected static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 
0x01, 0x00, 0x00, 0x00, 0x00 };
 
-    private static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 
};
+    protected static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 
0x00 };
 
-    private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings";
+    protected static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings";
 
     private static final HeaderSink HEADER_SINK = new HeaderSink();
 
-    private final String connectionId;
+    protected final String connectionId;
 
-    private final Adapter adapter;
-    private volatile SocketWrapperBase<?> socketWrapper;
-    private volatile SSLSupport sslSupport;
+    protected final Adapter adapter;
+    protected volatile SocketWrapperBase<?> socketWrapper;
+    protected volatile SSLSupport sslSupport;
 
-    private volatile Http2Parser parser;
+    protected volatile Http2Parser parser;
 
     // Simple state machine (sequence of states)
-    private AtomicReference<ConnectionState> connectionState =
+    protected AtomicReference<ConnectionState> connectionState =
             new AtomicReference<>(ConnectionState.NEW);
     private volatile long pausedNanoTime = Long.MAX_VALUE;
 
@@ -113,12 +113,12 @@ class Http2UpgradeHandler extends Abstra
      * Remote settings are settings defined by the client and sent to Tomcat
      * that Tomcat must use when communicating with the client.
      */
-    private final ConnectionSettingsRemote remoteSettings;
+    protected final ConnectionSettingsRemote remoteSettings;
     /**
      * Local settings are settings defined by Tomcat and sent to the client 
that
      * the client must use when communicating with Tomcat.
      */
-    private final ConnectionSettingsLocal localSettings;
+    protected final ConnectionSettingsLocal localSettings;
 
     private HpackDecoder hpackDecoder;
     private HpackEncoder hpackEncoder;
@@ -129,22 +129,22 @@ class Http2UpgradeHandler extends Abstra
     private long writeTimeout = Http2Protocol.DEFAULT_WRITE_TIMEOUT;
 
     private final Map<Integer,Stream> streams = new HashMap<>();
-    private final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0);
+    protected final AtomicInteger activeRemoteStreamCount = new 
AtomicInteger(0);
     private volatile int maxRemoteStreamId = 0;
     // Start at -1 so the 'add 2' logic in closeIdleStreams() works
     private volatile int maxActiveRemoteStreamId = -1;
     private volatile int maxProcessedStreamId;
     private final AtomicInteger nextLocalStreamId = new AtomicInteger(2);
-    private final PingManager pingManager = new PingManager();
+    protected final PingManager pingManager = getPingManager();
     private volatile int newStreamsSinceLastPrune = 0;
     // Tracking for when the connection is blocked (windowSize < 1)
     private final Map<AbstractStream,int[]> backLogStreams = new 
ConcurrentHashMap<>();
     private long backLogSize = 0;
 
     // Stream concurrency control
-    private int maxConcurrentStreamExecution = 
Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
-    private AtomicInteger streamConcurrency = null;
-    private Queue<StreamRunnable> queuedRunnable = null;
+    protected int maxConcurrentStreamExecution = 
Http2Protocol.DEFAULT_MAX_CONCURRENT_STREAM_EXECUTION;
+    protected AtomicInteger streamConcurrency = null;
+    protected Queue<StreamRunnable> queuedRunnable = null;
 
     // Limits
     private Set<String> allowedTrailerHeaders = Collections.emptySet();
@@ -177,6 +177,9 @@ class Http2UpgradeHandler extends Abstra
         }
     }
 
+    protected PingManager getPingManager() {
+        return new PingManager();
+    }
 
     @Override
     public void init(WebConnection webConnection) {
@@ -226,17 +229,7 @@ class Http2UpgradeHandler extends Abstra
         }
 
         // Send the initial settings frame
-        try {
-            byte[] settings = localSettings.getSettingsFrameForPending();
-            socketWrapper.write(true, settings, 0, settings.length);
-            socketWrapper.flush(true);
-        } catch (IOException ioe) {
-            String msg = sm.getString("upgradeHandler.sendPrefaceFail", 
connectionId);
-            if (log.isDebugEnabled()) {
-                log.debug(msg);
-            }
-            throw new ProtocolException(msg, ioe);
-        }
+        writeSettings();
 
         // Make sure the client has sent a valid connection preface before we
         // send the response to the original request over HTTP/2.
@@ -245,7 +238,7 @@ class Http2UpgradeHandler extends Abstra
         } catch (Http2Exception e) {
             String msg = sm.getString("upgradeHandler.invalidPreface", 
connectionId);
             if (log.isDebugEnabled()) {
-                log.debug(msg);
+                log.debug(msg, e);
             }
             throw new ProtocolException(msg);
         }
@@ -265,8 +258,7 @@ class Http2UpgradeHandler extends Abstra
         }
     }
 
-
-    private void processStreamOnContainerThread(Stream stream) {
+    protected void processStreamOnContainerThread(Stream stream) {
         StreamProcessor streamProcessor = new StreamProcessor(this, stream, 
adapter, socketWrapper);
         streamProcessor.setSslSupport(sslSupport);
         processStreamOnContainerThread(streamProcessor, SocketEvent.OPEN_READ);
@@ -323,7 +315,7 @@ class Http2UpgradeHandler extends Abstra
                 try {
                     // There is data to read so use the read timeout while
                     // reading frames.
-                   socketWrapper.setReadTimeout(getReadTimeout());
+                    socketWrapper.setReadTimeout(getReadTimeout());
                     while (true) {
                         try {
                             if (!parser.readFrame(false)) {
@@ -493,7 +485,23 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] 
debugMsg)
+    protected void writeSettings() {
+        // Send the initial settings frame
+        try {
+            byte[] settings = localSettings.getSettingsFrameForPending();
+            socketWrapper.write(true, settings, 0, settings.length);
+            socketWrapper.flush(true);
+        } catch (IOException ioe) {
+            String msg = sm.getString("upgradeHandler.sendPrefaceFail", 
connectionId);
+            if (log.isDebugEnabled()) {
+                log.debug(msg);
+            }
+            throw new ProtocolException(msg, ioe);
+        }
+    }
+
+
+    protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] 
debugMsg)
             throws IOException {
         byte[] fixedPayload = new byte[8];
         ByteUtil.set31Bits(fixedPayload, 0, maxStreamId);
@@ -567,7 +575,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private void prepareHeaders(Response coyoteResponse) {
+    protected void prepareHeaders(Response coyoteResponse) {
         MimeHeaders headers = coyoteResponse.getMimeHeaders();
         int statusCode = coyoteResponse.getStatus();
 
@@ -593,7 +601,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private void writePushHeaders(Stream stream, int pushedStreamId, Request 
coyoteRequest, int payloadSize)
+    protected void writePushHeaders(Stream stream, int pushedStreamId, Request 
coyoteRequest, int payloadSize)
             throws IOException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.writePushHeaders", 
connectionId,
@@ -633,7 +641,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private HpackEncoder getHpackEncoder() {
+    protected HpackEncoder getHpackEncoder() {
         if (hpackEncoder == null) {
             hpackEncoder = new HpackEncoder();
         }
@@ -688,7 +696,7 @@ class Http2UpgradeHandler extends Abstra
      * Note: We can not rely on this exception reaching the socket processor
      *       since the application code may swallow it.
      */
-    private void handleAppInitiatedIOException(IOException ioe) throws 
IOException {
+    protected void handleAppInitiatedIOException(IOException ioe) throws 
IOException {
         close();
         throw ioe;
     }
@@ -927,7 +935,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private Stream getStream(int streamId, boolean unknownIsError) throws 
ConnectionException {
+    protected Stream getStream(int streamId, boolean unknownIsError) throws 
ConnectionException {
         Integer key = Integer.valueOf(streamId);
         Stream result = streams.get(key);
         if (result == null && unknownIsError) {
@@ -1497,16 +1505,16 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private class PingManager {
+    protected class PingManager {
 
         // 10 seconds
-        private final long pingIntervalNano = 10000000000L;
+        protected final long pingIntervalNano = 10000000000L;
 
-        private int sequence = 0;
-        private long lastPingNanoTime = Long.MIN_VALUE;
+        protected int sequence = 0;
+        protected long lastPingNanoTime = Long.MIN_VALUE;
 
-        private Queue<PingRecord> inflightPings = new 
ConcurrentLinkedQueue<>();
-        private Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>();
+        protected Queue<PingRecord> inflightPings = new 
ConcurrentLinkedQueue<>();
+        protected Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>();
 
         /**
          * Check to see if a ping was sent recently and, if not, send one.
@@ -1557,6 +1565,7 @@ class Http2UpgradeHandler extends Abstra
             } else {
                 // Client originated ping. Echo it back.
                 synchronized (socketWrapper) {
+                    // FIXME: extract
                     socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
                     socketWrapper.write(true, payload, 0, payload.length);
                     socketWrapper.flush(true);
@@ -1570,7 +1579,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private static class PingRecord {
+    protected static class PingRecord {
 
         private final int sequence;
         private final long sentNanoTime;
@@ -1590,7 +1599,7 @@ class Http2UpgradeHandler extends Abstra
     }
 
 
-    private enum ConnectionState {
+    protected enum ConnectionState {
 
         NEW(true),
         CONNECTED(true),

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1786186&r1=1786185&r2=1786186&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Thu Mar  9 15:27:10 2017
@@ -45,6 +45,13 @@
   issues do not "pop up" wrt. others).
 -->
 <section name="Tomcat 9.0.0.M19 (markt)" rtext="in development">
+  <subsection name="Coyote">
+    <changelog>
+      <fix>
+        Add async based IO groundwork for HTTP/2. (remm)
+      </fix>
+    </changelog>
+  </subsection>
   <subsection name="Other">
     <changelog>
       <fix>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to