Hello. I 'm performing numerous requests to the search API, but interested
only in certain field (JSON / HTML attribute) that is presented usually in
the beginning of the document with search response. The parsing is performed
with the help of findWithinHorizon() method of Scanner, that allows to
efficiently iterate over the stream. I have decided to drop the connection
as far as the necessary field has been obtained. To perform this I close the
source endpoint of pipe (that is used as bridge between httclient consumer
and actual parsing logic).
Parsing code
Scanner s = new Scanner(pipe.source());
String match = s.findWithinHorizon(patternTotalResults, 0);
this.buildResult = Integer.parseInt(match);
// Send close() to the Pipe
* pipe.source().close();*
As far as the "bridge" pipe is closed, onByteReceived() methods that
performs data consuming obtains the IOException on write and going to drop
the active connection.
Consumer code
public abstract class CrawlerParsingConsumer extends
AsyncByteConsumer<Integer> {
@Override
protected void onByteReceived(ByteBuffer buffer, IOControl control)
throws IOException {
boolean isForcedTermination = false;
log.debug("Try to process block of " + buffer.remaining() + " size");
try {
while (buffer.hasRemaining()) {
pipe.sink().write(buffer);
}
} catch (IOException e) {
log.debug("Seems that pipe has been intensionally closed: " + e);
isForcedTermination = true;
} finally {
if (isForcedTermination) {
log.debug("Some data of block may been discarded. "
+ "Drop the connection as well since the rest of data is not needed");
*super.cancel(); //HERE I WANT TO DROP THE CONNECTION *
} else
log.debug("Block has been processed");
}
}
The problem is that despite of already calculated result (buildResult), the
future returned by httpclient.execute() is NULL, due to the NPE inside the
HttpClient.
Execution code
HttpAsyncGet httpAsyncGet = new HttpAsyncGet(request);
Future<Integer> future = httpclient.execute(httpAsyncGet, new
CrawlerParsingConsumer(), null);
....
*future.get(); //NPE here, since the connection has been
actually terminated in the middle of data tranfer*
Log
Aug 9, 2011 6:59:05 PM
weblab.webometric.crawler.parse.CrawlerParsingFutureTask onByteReceived
INFO: Seems that pipe has been intensionally closed: java.io.IOException: An
established connection was aborted by the software in your host machine
Aug 9, 2011 6:59:05 PM
weblab.webometric.crawler.parse.CrawlerParsingFutureTask onByteReceived
INFO: Some data of block may been discarded. Drop the connection as well
since the rest of data is not needed in this document
In finally of Google
Aug 9, 2011 6:59:05 PM
org.apache.http.impl.nio.client.AbstractHttpAsyncClient doExecute
SEVERE: I/O reactor terminated abnormally
org.apache.http.nio.reactor.IOReactorException: I/O dispatch worker
terminated abnormally
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:321)
at
org.apache.http.impl.nio.conn.PoolingClientConnectionManager.execute(PoolingClientConnectionManager.java:91)
at
org.apache.http.impl.nio.client.AbstractHttpAsyncClient.doExecute(AbstractHttpAsyncClient.java:441)
at
org.apache.http.impl.nio.client.AbstractHttpAsyncClient.access$000(AbstractHttpAsyncClient.java:95)
at
org.apache.http.impl.nio.client.AbstractHttpAsyncClient$1.run(AbstractHttpAsyncClient.java:462)
Caused by: java.lang.NullPointerException
at
org.apache.http.nio.client.methods.AsyncByteConsumer.onContentReceived(AsyncByteConsumer.java:65)
at
org.apache.http.nio.client.methods.AbstractHttpAsyncResponseConsumer.consumeContent(AbstractHttpAsyncResponseConsumer.java:71)
at
org.apache.http.impl.nio.client.DefaultAsyncRequestDirector.consumeContent(DefaultAsyncRequestDirector.java:309)
at
org.apache.http.impl.nio.client.NHttpClientProtocolHandler.inputReady(NHttpClientProtocolHandler.java:194)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:174)
at
org.apache.http.impl.nio.client.InternalClientEventDispatch.inputReady(InternalClientEventDispatch.java:80)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:158)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:340)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:318)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:278)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:542)
at java.lang.Thread.run(Thread.java:619)
The NPE is occured in asyncclient-4.0
http/nio/client/methods/AsyncByteConsumer.java,
@Override
protected void onContentReceived(
final ContentDecoder decoder, final IOControl ioctrl) throws
IOException {
if (this.bbuf == null) {
this.bbuf = ByteBuffer.allocate(this.bufSize);
}
for (;;) {
int bytesRead = decoder.read(this.bbuf);
if (bytesRead <= 0) {
break;
}
this.bbuf.flip();
onByteReceived(this.bbuf, ioctrl);
this.bbuf.clear();
}
}
since after the ByteConsumer receives cancel() signal its methods set up
this.bbuf to null. See below for the details.
AbstractHttpAsyncResponseConsumer
public synchronized void cancel() {
if (this.completed) {
return;
}
this.completed = true;
this.response = null;
releaseResources();
}
AsyncByteConsumer
@Override
void releaseResources() {
this.bbuf = null;
super.releaseResources();
}
My question is the following. Can I somehow terminate the further processing
(onByteReceived()) after an event, but remains the future returned from
httpclient normal.
Can you recommend another approach to perform partial nio data consuming (n
threads) and parsing (m threads).
--
Best regards,
~ Xasima ~