On Fri, 2012-07-27 at 15:20 -0400, Daniel Kulp wrote:
> I've committed a few experiments I've been working on to:
> 
> https://svn.apache.org/repos/asf/cxf/sandbox/dkulp_async_clients
> 
> 
> Basically, I've been trying to find an async client that is somewhat usable 
> for CXF without completely re-writing all of CXF.   Not exactly an easy 
> task.  For "POST"s, they pretty much are all designed around being able to 
> blast out pre-rendered content (like File's or byte[]).   Doesn't really fit 
> with CXF's way of streaming out the soap messages as they are created.   
> 

...

> 4) Apache HTTP Components (HC)- this was the first one I tried, ran into 
> performance issues, abandoned it to test the others, then came back to it 
> and figured out the performance issue.  :-)   I had this "working", but a 
> simple "hello world" echo in a loop resulted in VERY VERY slow operation, 
> about 20x slower than the URLConnection in the JDK.   Couldn't figure out 
> what was going on which is why I started looking at the others.   I came 
> back to it and started doing wireshark captures and discovered that it was 
> waiting for ACK packets whereas the other clients were not.   The main issue 
> was that the docs for how to set the TCP_NO_DELAY flag (which, to me, should 
> be the default) seem to be more geared toward the 3.x or non-NIO versions.   
> Anyway, once I managed to get that set, things improved significantly.  For 
> non-chunked data, it seems to be working very well.  For chunked data, it 
> seems to work well 99% of the time.   It's that last 1% that's going to 
> drive me nuts.  :-(   It's occassionally writing out bad chunk headers, and 
> I have no idea why.   A raw wireshark look certainly shows bad chunk headers 
> heading out.   I don't know if it's something I'm doing or a bug in their 
> stuff.  Don't really know yet.
> 

Hi Daniel

If my memory serves me well, we have not had a single confirmed case of
message corruption I could think of for many years. I took a cursory
look at your code and could not spot anything obviously wrong. I am
attaching a patch that adds wire and i/o event logging to your HTTP
conduit. If you set 'org.apache.http' category to DEBUG priority you
should be able to see what kind of stuff gets written to and read from
the underlying NIO channel and compare it with what you see with
Wireshark. 

If you let me know how to reproduce the issue I'll happily investigate
and try to find out what causes it. If there is anything wrong with
HttpCore I'll get it fixed. 


> In anycase, I'm likely going to pursue option #4 a bit more and see if I can 
> figure out the last issue with it.
> 
> >From a performance standpoint, for synchronous request/response, none of 
> them perform as well as the in-jdk HttpURLConnection for what we do.   Netty 
> came the closest at about 5% slower.  HC was about 10%, Jetty about 12%.  
> Gave up on Ning before running benchmarks.   
> 

I am quite surprised the difference compared to HttpURLConnection is
relatively small. In my experience a decent blocking HTTP client
outperforms a decent non-blocking HTTP client by as much as 50% as long
as the number of concurrent connections is moderate (<500). NIO starts
paying off only with concurrency well over 2000 or when connections stay
idle most of the time.

Cheers

Oleg

Index: pom.xml
===================================================================
--- pom.xml	(revision 1366514)
+++ pom.xml	(working copy)
@@ -67,8 +67,11 @@
             <artifactId>cxf-rt-transports-http</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>4.2.1</version>
Index: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnection.java
===================================================================
--- src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnection.java	(revision 0)
+++ src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnection.java	(revision 0)
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.http.Header;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.impl.nio.DefaultNHttpClientConnection;
+import org.apache.http.nio.NHttpClientEventHandler;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.params.HttpParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingNHttpClientConnection extends DefaultNHttpClientConnection {
+
+    private static final AtomicLong COUNT = new AtomicLong();
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultNHttpClientConnection.class);
+    private static final Logger IOLOG = LoggerFactory.getLogger(IOSession.class);
+    private static final Logger HEADERLOG  = LoggerFactory.getLogger("org.apache.http.headers");
+    private static final Logger WIRELOG = LoggerFactory.getLogger("org.apache.http.wire");
+
+    private final String id;
+
+    public LoggingNHttpClientConnection(
+            final IOSession session,
+            final HttpResponseFactory responseFactory,
+            final ByteBufferAllocator allocator,
+            final HttpParams params) {
+        super(session, responseFactory, allocator, params);
+        this.id = "http-outgoing-" + COUNT.incrementAndGet();
+        if (IOLOG.isDebugEnabled() || WIRELOG.isDebugEnabled()) {
+            this.session = new LoggingIOSession(session, this.id);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + ": Close connection");
+        }
+        super.close();
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + ": Shutdown connection");
+        }
+        super.shutdown();
+    }
+
+    @Override
+    public void submitRequest(final HttpRequest request) throws IOException, HttpException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + ": "  + request.getRequestLine().toString());
+        }
+        super.submitRequest(request);
+    }
+
+    @Override
+    public void consumeInput(final NHttpClientEventHandler handler) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + ": Consume input");
+        }
+        super.consumeInput(handler);
+    }
+
+    @Override
+    public void produceOutput(final NHttpClientEventHandler handler) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + ": Produce output");
+        }
+        super.produceOutput(handler);
+    }
+
+    @Override
+    protected void onResponseReceived(final HttpResponse response) {
+        if (response != null && HEADERLOG.isDebugEnabled()) {
+            HEADERLOG.debug(this.id + " << " + response.getStatusLine().toString());
+            Header[] headers = response.getAllHeaders();
+            for (int i = 0; i < headers.length; i++) {
+                HEADERLOG.debug(this.id + " << " + headers[i].toString());
+            }
+        }
+    }
+
+    @Override
+    protected void onRequestSubmitted(final HttpRequest request) {
+        if (request != null && HEADERLOG.isDebugEnabled()) {
+            HEADERLOG.debug(id + " >> " + request.getRequestLine().toString());
+            Header[] headers = request.getAllHeaders();
+            for (int i = 0; i < headers.length; i++) {
+                HEADERLOG.debug(this.id + " >> " + headers[i].toString());
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return this.id;
+    }
+
+}

Property changes on: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnection.java
___________________________________________________________________
Added: svn:eol-style
   + native
Added: svn:mime-type
   + text/plain
Added: svn:keywords
   + Date Revision

Index: src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java
===================================================================
--- src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java	(revision 0)
+++ src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java	(revision 0)
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class Wire {
+
+    private static final Logger WIRELOG = LoggerFactory.getLogger("org.apache.http.wire");
+
+    private final String id;
+
+    public Wire(final String id) {
+        super();
+        this.id = id;
+    }
+
+    private void wire(final String header, final byte[] b, int pos, int off) {
+        StringBuilder buffer = new StringBuilder();
+        for (int i = 0; i < off; i++) {
+            int ch = b[pos + i];
+            if (ch == 13) {
+                buffer.append("[\\r]");
+            } else if (ch == 10) {
+                buffer.append("[\\n]\"");
+                buffer.insert(0, "\"");
+                buffer.insert(0, header);
+                WIRELOG.debug(this.id + " " + buffer.toString());
+                buffer.setLength(0);
+            } else if ((ch < 32) || (ch > 127)) {
+                buffer.append("[0x");
+                buffer.append(Integer.toHexString(ch));
+                buffer.append("]");
+            } else {
+                buffer.append((char) ch);
+            }
+        }
+        if (buffer.length() > 0) {
+            buffer.append('\"');
+            buffer.insert(0, '\"');
+            buffer.insert(0, header);
+            WIRELOG.debug(this.id + " " + buffer.toString());
+        }
+    }
+
+
+    public boolean isEnabled() {
+        return WIRELOG.isDebugEnabled();
+    }
+
+    public void output(final byte[] b, int pos, int off) {
+        wire("<< ", b, pos, off);
+    }
+
+    public void input(final byte[] b, int pos, int off) {
+        wire(">> ", b, pos, off);
+    }
+
+    public void output(byte[] b) {
+        output(b, 0, b.length);
+    }
+
+    public void input(byte[] b) {
+        input(b, 0, b.length);
+    }
+
+    public void output(int b) {
+        output(new byte[] {(byte) b});
+    }
+
+    public void input(int b) {
+        input(new byte[] {(byte) b});
+    }
+
+    public void output(final ByteBuffer b) {
+        if (b.hasArray()) {
+            output(b.array(), b.arrayOffset() + b.position(), b.remaining());
+        } else {
+            byte[] tmp = new byte[b.remaining()];
+            b.get(tmp);
+            output(tmp);
+        }
+    }
+
+    public void input(final ByteBuffer b) {
+        if (b.hasArray()) {
+            input(b.array(), b.arrayOffset() + b.position(), b.remaining());
+        } else {
+            byte[] tmp = new byte[b.remaining()];
+            b.get(tmp);
+            input(tmp);
+        }
+    }
+
+}

Property changes on: src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain
Added: svn:keywords
   + Date Revision
Added: svn:eol-style
   + native

Index: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnectionFactory.java
===================================================================
--- src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnectionFactory.java	(revision 0)
+++ src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnectionFactory.java	(revision 0)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.annotation.Immutable;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.impl.nio.DefaultNHttpClientConnection;
+import org.apache.http.impl.nio.DefaultNHttpClientConnectionFactory;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
+import org.apache.http.params.HttpParams;
+
+@Immutable
+public class LoggingNHttpClientConnectionFactory extends DefaultNHttpClientConnectionFactory {
+
+    public LoggingNHttpClientConnectionFactory(
+            final HttpResponseFactory responseFactory,
+            final ByteBufferAllocator allocator,
+            final HttpParams params) {
+        super(responseFactory, allocator, params);
+    }
+
+    public LoggingNHttpClientConnectionFactory(final HttpParams params) {
+        this(new DefaultHttpResponseFactory(), new HeapByteBufferAllocator(), params);
+    }
+
+    @Override
+    protected DefaultNHttpClientConnection createConnection(
+            final IOSession session,
+            final HttpResponseFactory responseFactory, 
+            final ByteBufferAllocator allocator, 
+            final HttpParams params) {
+        return new LoggingNHttpClientConnection(session, responseFactory, allocator, params);
+    }
+
+}

Property changes on: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingNHttpClientConnectionFactory.java
___________________________________________________________________
Added: svn:eol-style
   + native
Added: svn:mime-type
   + text/plain
Added: svn:keywords
   + Date Revision

Index: src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
===================================================================
--- src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java	(revision 1366514)
+++ src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java	(working copy)
@@ -115,7 +115,8 @@
         // Create client-side HTTP protocol handler
         HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
         // Create client-side I/O event dispatch
-        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
+        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, 
+                new LoggingNHttpClientConnectionFactory(params));
         // Create client-side I/O reactor
         IOReactorConfig config = new IOReactorConfig();
         config.setTcpNoDelay(true);
Index: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java
===================================================================
--- src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java	(revision 0)
+++ src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java	(revision 0)
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.SessionBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator class intended to transparently extend an {@link IOSession}
+ * with basic event logging capabilities using Commons Logging.
+ *
+ */
+public class LoggingIOSession implements IOSession {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IOSession.class);
+
+    private final String id;
+    private final IOSession session;
+    private final ByteChannel channel;
+    private final Wire wirelog;
+    
+    public LoggingIOSession(final IOSession session, final String id) {
+        super();
+        if (session == null) {
+            throw new IllegalArgumentException("I/O session may not be null");
+        }
+        this.session = session;
+        this.channel = new LoggingByteChannel();
+        this.id = id;
+        this.wirelog = new Wire(this.id);
+    }
+
+    public ByteChannel channel() {
+        return this.channel;
+    }
+
+    public SocketAddress getLocalAddress() {
+        return this.session.getLocalAddress();
+    }
+
+    public SocketAddress getRemoteAddress() {
+        return this.session.getRemoteAddress();
+    }
+
+    public int getEventMask() {
+        return this.session.getEventMask();
+    }
+
+    private static String formatOps(int ops) {
+        StringBuilder buffer = new StringBuilder(6);
+        buffer.append('[');
+        if ((ops & SelectionKey.OP_READ) > 0) {
+            buffer.append('r');
+        }
+        if ((ops & SelectionKey.OP_WRITE) > 0) {
+            buffer.append('w');
+        }
+        if ((ops & SelectionKey.OP_ACCEPT) > 0) {
+            buffer.append('a');
+        }
+        if ((ops & SelectionKey.OP_CONNECT) > 0) {
+            buffer.append('c');
+        }
+        buffer.append(']');
+        return buffer.toString();
+    }
+
+    public void setEventMask(int ops) {
+        this.session.setEventMask(ops);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Event mask set " + formatOps(ops));
+        }
+    }
+
+    public void setEvent(int op) {
+        this.session.setEvent(op);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Event set " + formatOps(op));
+        }
+    }
+
+    public void clearEvent(int op) {
+        this.session.clearEvent(op);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Event cleared " + formatOps(op));
+        }
+    }
+
+    public void close() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Close");
+        }
+        this.session.close();
+    }
+
+    public int getStatus() {
+        return this.session.getStatus();
+    }
+
+    public boolean isClosed() {
+        return this.session.isClosed();
+    }
+
+    public void shutdown() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Shutdown");
+        }
+        this.session.shutdown();
+    }
+
+    public int getSocketTimeout() {
+        return this.session.getSocketTimeout();
+    }
+
+    public void setSocketTimeout(int timeout) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Set timeout " + timeout);
+        }
+        this.session.setSocketTimeout(timeout);
+    }
+
+    public void setBufferStatus(final SessionBufferStatus status) {
+        this.session.setBufferStatus(status);
+    }
+
+    public boolean hasBufferedInput() {
+        return this.session.hasBufferedInput();
+    }
+
+    public boolean hasBufferedOutput() {
+        return this.session.hasBufferedOutput();
+    }
+
+    public Object getAttribute(final String name) {
+        return this.session.getAttribute(name);
+    }
+
+    public void setAttribute(final String name, final Object obj) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Set attribute " + name);
+        }
+        this.session.setAttribute(name, obj);
+    }
+
+    public Object removeAttribute(final String name) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(this.id + " " + this.session + ": Remove attribute " + name);
+        }
+        return this.session.removeAttribute(name);
+    }
+
+    @Override
+    public String toString() {
+        return this.id + " " + this.session.toString();
+    }
+
+    class LoggingByteChannel implements ByteChannel {
+
+        public int read(final ByteBuffer dst) throws IOException {
+            int bytesRead = session.channel().read(dst);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(id + " " + session + ": " + bytesRead + " bytes read");
+            }
+            if (bytesRead > 0 && wirelog.isEnabled()) {
+                ByteBuffer b = dst.duplicate();
+                int p = b.position();
+                b.limit(p);
+                b.position(p - bytesRead);
+                wirelog.input(b);
+            }
+            return bytesRead;
+        }
+
+        public int write(final ByteBuffer src) throws IOException {
+            int byteWritten = session.channel().write(src);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(id + " " + session + ": " + byteWritten + " bytes written");
+            }
+            if (byteWritten > 0 && wirelog.isEnabled()) {
+                ByteBuffer b = src.duplicate();
+                int p = b.position();
+                b.limit(p);
+                b.position(p - byteWritten);
+                wirelog.output(b);
+            }
+            return byteWritten;
+        }
+
+        public void close() throws IOException {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(id + " " + session + ": Channel close");
+            }
+            session.channel().close();
+        }
+
+        public boolean isOpen() {
+            return session.channel().isOpen();
+        }
+
+    }
+
+}
\ No newline at end of file

Property changes on: src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java
___________________________________________________________________
Added: svn:eol-style
   + native
Added: svn:mime-type
   + text/plain
Added: svn:keywords
   + Date Revision

Reply via email to