[
https://issues.apache.org/jira/browse/KUDU-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Grant Henke resolved KUDU-2116.
-------------------------------
Fix Version/s: NA
Resolution: Duplicate
> kudu client multi thread scanner throw exception:Invalid call sequence ID in
> scan request
> -----------------------------------------------------------------------------------------
>
> Key: KUDU-2116
> URL: https://issues.apache.org/jira/browse/KUDU-2116
> Project: Kudu
> Issue Type: Bug
> Components: client, java
> Affects Versions: 1.4.0
> Environment: java8
> Development environment win 7
> kudu server version is 'kudu 1.4.0-cdh5.12.0'
> kudu client version is '1.4.0'
> Reporter: WangBo
> Assignee: Todd Lipcon
> Priority: Major
> Fix For: NA
>
>
> I want to use kudu-client to fetch data.To improve efficiency,I use multi
> thread to do this.Use KuduScanToken.intoScanner to get KuduScanner,then use
> KuduScanner to fetch data.In single thread environment,it's ok.but when using
> multi thread,every thread holds a kuduScanner to fetch data,exception occurs
> as below:
> {panel:title=My title}
> org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in
> scan request
> at
> org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:526)
> at
> org.apache.kudu.client.TabletClient.messageReceived(TabletClient.java:456)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
> at
> org.apache.kudu.client.TabletClient.handleUpstream(TabletClient.java:603)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> at
> org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> the client code as below:
> {code:java}
> public class ScannerTest {
> ExecutorService pool = Executors.newFixedThreadPool(8);
> @Test
> public void scan() throws Exception {
> long begin = System.currentTimeMillis();
> String tableName = "impala::test.test_table";
> KuduClient kuduClient = getKuduClient();
> KuduTable kuduTable = kuduClient.openTable(tableName);
> List<KuduScanToken> kuduScanTokens =
> kuduClient.newScanTokenBuilder(kuduTable)
> .build();
> List<Future> list = Lists.newArrayList();
> for (int i = 0;i < kuduScanTokens.size();i ++) {
> KuduScanToken kuduScanToken = kuduScanTokens.get(i);
> KuduScanner kuduScanner =
> kuduScanToken.intoScanner(getKuduClient());
> list.add(pool.submit(new Callable<Object>() {
> public Object call() {
> String name = Thread.currentThread().getName();
> try {
> System.out.println("scan start:" + name);
> doKuduScan(kuduScanner, name);
> return name;
> } catch (KuduException e) {
> e.printStackTrace();
> return e.getMessage();
> }
> }
> }));
> }
> list.forEach(future -> {
> try {
> System.out.println(future.get());
> } catch (InterruptedException e) {
> e.printStackTrace();
> } catch (ExecutionException e) {
> e.printStackTrace();
> }
> });
> pool.shutdown();
> }
> public void doKuduScan(KuduScanner kuduScanner, String threadName) throws
> KuduException {
> while (kuduScanner.hasMoreRows()) {
> RowResultIterator results = kuduScanner.nextRows();
> System.out.println(threadName + ",num=" + results.getNumRows());
> while (results.hasNext()) {
> RowResult rowResult = results.next();
> }
> }
> }
> public static KuduClient getKuduClient() {
> KuduClient kuduClient = new
> KuduClient.KuduClientBuilder(hostName).build();
> return kuduClient;
> }
> }
> {code}
> the more thread num is,the more easily case occurs
--
This message was sent by Atlassian Jira
(v8.3.4#803005)