WangBo created KUDU-2116:
----------------------------
Summary: kudu client multi thread scanner throw exception
Key: KUDU-2116
URL: https://issues.apache.org/jira/browse/KUDU-2116
Project: Kudu
Issue Type: Bug
Components: client, java
Affects Versions: 1.4.0
Environment: java8
Development environment win 7
kudu server version is 'kudu 1.4.0-cdh5.12.0'
kudu client version is '1.4.0'
Reporter: WangBo
I want to use kudu-client to fetch data.To improve efficiency,I use multi
thread to do this.Use KuduScanToken.intoScanner to get KuduScanner,then use
KuduScanner to fetch data.In single thread environment,it's ok.but when using
multi thread,every thread holds a kuduScanner to fetch data,exception occurs as
below:
{panel:title=My title}
org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in
scan request
at
org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:526)
at
org.apache.kudu.client.TabletClient.messageReceived(TabletClient.java:456)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at
org.apache.kudu.client.TabletClient.handleUpstream(TabletClient.java:603)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at
org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{panel}
the client code as below:
{code:java}
public class ScannerTest {
ExecutorService pool = Executors.newFixedThreadPool(8);
@Test
public void scan() throws Exception {
long begin = System.currentTimeMillis();
String tableName = "impala::test.test_table";
KuduClient kuduClient = getKuduClient();
KuduTable kuduTable = kuduClient.openTable(tableName);
List<KuduScanToken> kuduScanTokens =
kuduClient.newScanTokenBuilder(kuduTable)
.build();
List<Future> list = Lists.newArrayList();
for (int i = 0;i < kuduScanTokens.size();i ++) {
KuduScanToken kuduScanToken = kuduScanTokens.get(i);
KuduScanner kuduScanner = kuduScanToken.intoScanner(kuduClient);
list.add(pool.submit(new Callable<Object>() {
public Object call() {
String name = Thread.currentThread().getName();
try {
System.out.println("scan start:" + name);
doKuduScan(kuduScanner, name);
return name;
} catch (KuduException e) {
e.printStackTrace();
return e.getMessage();
}
}
}));
}
list.forEach(future -> {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
pool.shutdown();
}
public void doKuduScan(KuduScanner kuduScanner, String threadName) throws
KuduException {
while (kuduScanner.hasMoreRows()) {
RowResultIterator results = kuduScanner.nextRows();
System.out.println(threadName + ",num=" + results.getNumRows());
while (results.hasNext()) {
RowResult rowResult = results.next();
}
}
}
public static KuduClient getKuduClient() {
KuduClient kuduClient = new
KuduClient.KuduClientBuilder(hostName).build();
return kuduClient;
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)