[ 
https://issues.apache.org/jira/browse/KUDU-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

WangBo updated KUDU-2116:
-------------------------
    Description: 
I want to use kudu-client to fetch data.To improve efficiency,I use multi 
thread to do this.Use KuduScanToken.intoScanner to get KuduScanner,then use 
KuduScanner to fetch data.In single thread environment,it's ok.but when using 
multi thread,every thread holds a kuduScanner to fetch data,exception occurs as 
below:
{panel:title=My title}
org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in 
scan request
        at 
org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:526)
        at 
org.apache.kudu.client.TabletClient.messageReceived(TabletClient.java:456)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.TabletClient.handleUpstream(TabletClient.java:603)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{panel}
the client code as below:

{code:java}
public class ScannerTest {

    ExecutorService pool = Executors.newFixedThreadPool(8);

    @Test
    public void scan() throws Exception {
        long begin = System.currentTimeMillis();
        String tableName = "impala::test.test_table";
        KuduClient kuduClient = getKuduClient();
        KuduTable kuduTable = kuduClient.openTable(tableName);
        List<KuduScanToken> kuduScanTokens = 
kuduClient.newScanTokenBuilder(kuduTable)
                .build();
        List<Future> list = Lists.newArrayList();
        for (int i = 0;i < kuduScanTokens.size();i ++) {
            KuduScanToken kuduScanToken = kuduScanTokens.get(i);
            KuduScanner kuduScanner = 
kuduScanToken.intoScanner(getKuduClient());
            list.add(pool.submit(new Callable<Object>() {
                public Object call() {
                    String name = Thread.currentThread().getName();
                    try {
                        System.out.println("scan start:" + name);
                        doKuduScan(kuduScanner, name);
                        return name;
                    } catch (KuduException e) {
                        e.printStackTrace();
                        return e.getMessage();
                    }
                }
            }));
        }
        list.forEach(future -> {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
        pool.shutdown();
    }

    public void doKuduScan(KuduScanner kuduScanner, String threadName) throws 
KuduException {
        while (kuduScanner.hasMoreRows()) {
            RowResultIterator results = kuduScanner.nextRows();
            System.out.println(threadName + ",num=" + results.getNumRows());
            while (results.hasNext()) {
                RowResult rowResult = results.next();
            }
        }
    }

    public static KuduClient getKuduClient() {
        KuduClient kuduClient = new 
KuduClient.KuduClientBuilder(hostName).build();
        return kuduClient;
    }

}
{code}
the more thread num is,the more easily case occurs


  was:
I want to use kudu-client to fetch data.To improve efficiency,I use multi 
thread to do this.Use KuduScanToken.intoScanner to get KuduScanner,then use 
KuduScanner to fetch data.In single thread environment,it's ok.but when using 
multi thread,every thread holds a kuduScanner to fetch data,exception occurs as 
below:
{panel:title=My title}
org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in 
scan request
        at 
org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:526)
        at 
org.apache.kudu.client.TabletClient.messageReceived(TabletClient.java:456)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.TabletClient.handleUpstream(TabletClient.java:603)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{panel}
the client code as below:

{code:java}
public class ScannerTest {

    ExecutorService pool = Executors.newFixedThreadPool(8);

    @Test
    public void scan() throws Exception {
        long begin = System.currentTimeMillis();
        String tableName = "impala::test.test_table";
        KuduClient kuduClient = getKuduClient();
        KuduTable kuduTable = kuduClient.openTable(tableName);
        List<KuduScanToken> kuduScanTokens = 
kuduClient.newScanTokenBuilder(kuduTable)
                .build();
        List<Future> list = Lists.newArrayList();
        for (int i = 0;i < kuduScanTokens.size();i ++) {
            KuduScanToken kuduScanToken = kuduScanTokens.get(i);
            KuduScanner kuduScanner = 
kuduScanToken.intoScanner(kuduScanToken.intoScanner(getKuduClient()));
            list.add(pool.submit(new Callable<Object>() {
                public Object call() {
                    String name = Thread.currentThread().getName();
                    try {
                        System.out.println("scan start:" + name);
                        doKuduScan(kuduScanner, name);
                        return name;
                    } catch (KuduException e) {
                        e.printStackTrace();
                        return e.getMessage();
                    }
                }
            }));
        }
        list.forEach(future -> {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
        pool.shutdown();
    }

    public void doKuduScan(KuduScanner kuduScanner, String threadName) throws 
KuduException {
        while (kuduScanner.hasMoreRows()) {
            RowResultIterator results = kuduScanner.nextRows();
            System.out.println(threadName + ",num=" + results.getNumRows());
            while (results.hasNext()) {
                RowResult rowResult = results.next();
            }
        }
    }

    public static KuduClient getKuduClient() {
        KuduClient kuduClient = new 
KuduClient.KuduClientBuilder(hostName).build();
        return kuduClient;
    }

}
{code}
the more thread num is,the more easily case occurs



> kudu client multi thread scanner throw exception:Invalid call sequence ID in 
> scan request
> -----------------------------------------------------------------------------------------
>
>                 Key: KUDU-2116
>                 URL: https://issues.apache.org/jira/browse/KUDU-2116
>             Project: Kudu
>          Issue Type: Bug
>          Components: client, java
>    Affects Versions: 1.4.0
>         Environment: java8
> Development environment win 7
> kudu server version is 'kudu 1.4.0-cdh5.12.0'
> kudu client version is '1.4.0'
>            Reporter: WangBo
>            Assignee: Todd Lipcon
>
> I want to use kudu-client to fetch data.To improve efficiency,I use multi 
> thread to do this.Use KuduScanToken.intoScanner to get KuduScanner,then use 
> KuduScanner to fetch data.In single thread environment,it's ok.but when using 
> multi thread,every thread holds a kuduScanner to fetch data,exception occurs 
> as below:
> {panel:title=My title}
> org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in 
> scan request
>       at 
> org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:526)
>       at 
> org.apache.kudu.client.TabletClient.messageReceived(TabletClient.java:456)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
>       at 
> org.apache.kudu.client.TabletClient.handleUpstream(TabletClient.java:603)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>       at 
> org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {panel}
> the client code as below:
> {code:java}
> public class ScannerTest {
>     ExecutorService pool = Executors.newFixedThreadPool(8);
>     @Test
>     public void scan() throws Exception {
>         long begin = System.currentTimeMillis();
>         String tableName = "impala::test.test_table";
>         KuduClient kuduClient = getKuduClient();
>         KuduTable kuduTable = kuduClient.openTable(tableName);
>         List<KuduScanToken> kuduScanTokens = 
> kuduClient.newScanTokenBuilder(kuduTable)
>                 .build();
>         List<Future> list = Lists.newArrayList();
>         for (int i = 0;i < kuduScanTokens.size();i ++) {
>             KuduScanToken kuduScanToken = kuduScanTokens.get(i);
>             KuduScanner kuduScanner = 
> kuduScanToken.intoScanner(getKuduClient());
>             list.add(pool.submit(new Callable<Object>() {
>                 public Object call() {
>                     String name = Thread.currentThread().getName();
>                     try {
>                         System.out.println("scan start:" + name);
>                         doKuduScan(kuduScanner, name);
>                         return name;
>                     } catch (KuduException e) {
>                         e.printStackTrace();
>                         return e.getMessage();
>                     }
>                 }
>             }));
>         }
>         list.forEach(future -> {
>             try {
>                 System.out.println(future.get());
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             } catch (ExecutionException e) {
>                 e.printStackTrace();
>             }
>         });
>         pool.shutdown();
>     }
>     public void doKuduScan(KuduScanner kuduScanner, String threadName) throws 
> KuduException {
>         while (kuduScanner.hasMoreRows()) {
>             RowResultIterator results = kuduScanner.nextRows();
>             System.out.println(threadName + ",num=" + results.getNumRows());
>             while (results.hasNext()) {
>                 RowResult rowResult = results.next();
>             }
>         }
>     }
>     public static KuduClient getKuduClient() {
>         KuduClient kuduClient = new 
> KuduClient.KuduClientBuilder(hostName).build();
>         return kuduClient;
>     }
> }
> {code}
> the more thread num is,the more easily case occurs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to