Hi Oleg,

Thanks for merging!

Reading code like:

 ioSession.lock().unlock();

is quite confusing, how about:

 ioSession.getLock().unlock();

?

Gary

On Tue, Nov 20, 2018 at 12:39 AM <[email protected]> wrote:

> Repository: httpcomponents-core
> Updated Branches:
>   refs/heads/master b6b71d90d -> 0a31b7216
>
>
> HTTP/1.1 and HTTP/2 async protocol handlers to use I/O session lock for
> output synchronization
>
>
> Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/0a31b721
> Tree:
> http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/0a31b721
> Diff:
> http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/0a31b721
>
> Branch: refs/heads/master
> Commit: 0a31b7216933795836a6f057886674d24843615a
> Parents: e7d0a16
> Author: Oleg Kalnichevski <[email protected]>
> Authored: Mon Nov 19 15:10:21 2018 +0100
> Committer: Oleg Kalnichevski <[email protected]>
> Committed: Tue Nov 20 08:37:03 2018 +0100
>
> ----------------------------------------------------------------------
>  .../nio/AbstractHttp2StreamMultiplexer.java     | 40 +++++++++-----------
>  .../impl/nio/AbstractHttp1StreamDuplexer.java   | 40 ++++++++------------
>  2 files changed, 33 insertions(+), 47 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/0a31b721/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
> ----------------------------------------------------------------------
> diff --git
> a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
> b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
> index e0b37f0..6d2cd3c 100644
> ---
> a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
> +++
> b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
> @@ -41,8 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
>  import java.util.concurrent.ConcurrentLinkedDeque;
>  import java.util.concurrent.ConcurrentLinkedQueue;
>  import java.util.concurrent.atomic.AtomicInteger;
> -import java.util.concurrent.locks.Lock;
> -import java.util.concurrent.locks.ReentrantLock;
>
>  import javax.net.ssl.SSLSession;
>
> @@ -117,7 +115,6 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>      private final Queue<AsyncPingHandler> pingHandlers;
>      private final AtomicInteger connInputWindow;
>      private final AtomicInteger connOutputWindow;
> -    private final Lock outputLock;
>      private final AtomicInteger outputRequests;
>      private final AtomicInteger lastStreamId;
>      private final Http2StreamListener streamListener;
> @@ -153,7 +150,6 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>          this.outputBuffer = new FrameOutputBuffer(this.outputMetrics,
> this.localConfig.getMaxFrameSize());
>          this.outputQueue = new ConcurrentLinkedDeque<>();
>          this.pingHandlers = new ConcurrentLinkedQueue<>();
> -        this.outputLock = new ReentrantLock();
>          this.outputRequests = new AtomicInteger(0);
>          this.lastStreamId = new AtomicInteger(0);
>          this.hPackEncoder = new
> HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
> @@ -246,11 +242,11 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>
>      private void commitFrame(final RawFrame frame) throws IOException {
>          Args.notNull(frame, "Frame");
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              commitFrameInternal(frame);
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
> @@ -434,7 +430,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>      }
>
>      public final void onOutput() throws HttpException, IOException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (!outputBuffer.isEmpty()) {
>                  outputBuffer.flush(ioSession.channel());
> @@ -451,7 +447,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                  }
>              }
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>
>          final int connWinSize = connInputWindow.get();
> @@ -483,7 +479,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                      }
>                  }
>              }
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (!outputPending && outputBuffer.isEmpty() &&
> outputQueue.isEmpty()
>                          &&
> outputRequests.compareAndSet(pendingOutputRequests, 0)) {
> @@ -492,7 +488,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                      outputRequests.addAndGet(-pendingOutputRequests);
>                  }
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
> @@ -513,13 +509,13 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>              }
>          }
>          if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
>                      ioSession.close();
>                  }
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>      }
> @@ -1307,7 +1303,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>
>          @Override
>          public void submit(final List<Header> headers, final boolean
> endStream) throws IOException {
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (headers == null || headers.isEmpty()) {
>                      throw new
> H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are
> missing");
> @@ -1321,7 +1317,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                      localEndStream = true;
>                  }
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
> @@ -1342,7 +1338,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>              final Http2Stream stream = new Http2Stream(channel,
> streamHandler, false);
>              streamMap.put(promisedStreamId, stream);
>
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (localEndStream) {
>                      stream.releaseResources();
> @@ -1351,7 +1347,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                  commitPushPromise(id, promisedStreamId, headers);
>                  idle = false;
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
> @@ -1365,20 +1361,20 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>
>          @Override
>          public int write(final ByteBuffer payload) throws IOException {
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (localEndStream) {
>                      return 0;
>                  }
>                  return streamData(id, outputWindow, payload);
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
>          @Override
>          public void endStream(final List<? extends Header> trailers)
> throws IOException {
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (localEndStream) {
>                      return;
> @@ -1391,7 +1387,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                      commitFrameInternal(frame);
>                  }
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
> @@ -1431,7 +1427,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>          }
>
>          boolean localReset(final int code) throws IOException {
> -            outputLock.lock();
> +            ioSession.lock().lock();
>              try {
>                  if (localEndStream) {
>                      return false;
> @@ -1445,7 +1441,7 @@ abstract class AbstractHttp2StreamMultiplexer
> implements Identifiable, HttpConne
>                  }
>                  return false;
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>          }
>
>
>
> http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/0a31b721/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
> ----------------------------------------------------------------------
> diff --git
> a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
> b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
> index 5b549ba..585cbed 100644
> ---
> a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
> +++
> b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
> @@ -36,8 +36,6 @@ import java.nio.channels.SelectionKey;
>  import java.nio.channels.WritableByteChannel;
>  import java.util.List;
>  import java.util.concurrent.atomic.AtomicInteger;
> -import java.util.concurrent.locks.Lock;
> -import java.util.concurrent.locks.ReentrantLock;
>
>  import javax.net.ssl.SSLSession;
>
> @@ -97,7 +95,6 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>      private final ContentLengthStrategy outgoingContentStrategy;
>      private final AtomicInteger inputWindow;
>      private final ByteBuffer contentBuffer;
> -    private final Lock outputLock;
>      private final AtomicInteger outputRequests;
>
>      private volatile Message<IncomingMessage, ContentDecoder>
> incomingMessage;
> @@ -134,7 +131,6 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>                  DefaultContentLengthStrategy.INSTANCE;
>          this.inputWindow = new AtomicInteger(0);
>          this.contentBuffer =
> ByteBuffer.allocate(this.h1Config.getBufferSize());
> -        this.outputLock = new ReentrantLock();
>          this.outputRequests = new AtomicInteger(0);
>          this.connState = ConnectionState.READY;
>      }
> @@ -362,7 +358,7 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>      }
>
>      public final void onOutput() throws IOException, HttpException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (outbuf.hasData()) {
>                  final int bytesWritten =
> outbuf.flush(ioSession.channel());
> @@ -371,29 +367,23 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>                  }
>              }
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>          if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
>              produceOutput();
>              final int pendingOutputRequests = outputRequests.get();
>              final boolean outputPending = isOutputReady();
> -            outputLock.lock();
> +            final boolean outputEnd;
> +            ioSession.lock().lock();
>              try {
>                  if (!outputPending && !outbuf.hasData() &&
> outputRequests.compareAndSet(pendingOutputRequests, 0)) {
>                      ioSession.clearEvent(SelectionKey.OP_WRITE);
>                  } else {
>                      outputRequests.addAndGet(-pendingOutputRequests);
>                  }
> -            } finally {
> -                outputLock.unlock();
> -            }
> -
> -            outputLock.lock();
> -            final boolean outputEnd;
> -            try {
>                  outputEnd = outgoingMessage == null && !outbuf.hasData();
>              } finally {
> -                outputLock.unlock();
> +                ioSession.lock().unlock();
>              }
>              if (outputEnd) {
>                  outputEnd();
> @@ -469,7 +459,7 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>              final OutgoingMessage messageHead,
>              final boolean endStream,
>              final FlushMode flushMode) throws HttpException, IOException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              outgoingMessageWriter.write(messageHead, outbuf);
>              updateOutputMetrics(messageHead, connMetrics);
> @@ -491,7 +481,7 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>              }
>              ioSession.setEvent(EventMask.WRITE);
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
> @@ -513,7 +503,7 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>      }
>
>      void suspendSessionOutput() throws IOException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (outbuf.hasData()) {
>                  outbuf.flush(ioSession.channel());
> @@ -521,12 +511,12 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>                  ioSession.clearEvent(SelectionKey.OP_WRITE);
>              }
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
>      int streamOutput(final ByteBuffer src) throws IOException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (outgoingMessage == null) {
>                  throw new ClosedChannelException();
> @@ -538,14 +528,14 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>              }
>              return bytesWritten;
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
>      enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
>
>      MessageDelineation endOutputStream(final List<? extends Header>
> trailers) throws IOException {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (outgoingMessage == null) {
>                  return MessageDelineation.NONE;
> @@ -558,12 +548,12 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>                              ? MessageDelineation.CHUNK_CODED
>                              : MessageDelineation.MESSAGE_HEAD;
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
>      boolean isOutputCompleted() {
> -        outputLock.lock();
> +        ioSession.lock().lock();
>          try {
>              if (outgoingMessage == null) {
>                  return true;
> @@ -571,7 +561,7 @@ abstract class
> AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
>              final ContentEncoder contentEncoder =
> outgoingMessage.getBody();
>              return contentEncoder.isCompleted();
>          } finally {
> -            outputLock.unlock();
> +            ioSession.lock().unlock();
>          }
>      }
>
>
>

Reply via email to