lichenglin created FLUME-3024:
---------------------------------

             Summary: TestThriftRpcClient.testMultipleThreads  is not really 
multipleThreads  
                 Key: FLUME-3024
                 URL: https://issues.apache.org/jira/browse/FLUME-3024
             Project: Flume
          Issue Type: Bug
          Components: Test
    Affects Versions: v1.6.0
            Reporter: lichenglin


{code}
ThriftRpcClient client = (ThriftRpcClient) 
RpcClientFactory.getThriftInstance("172.17.0.12", 4444, 10);
                int threadCount = 100;
                ExecutorService submissionSvc = 
Executors.newFixedThreadPool(threadCount);
                ArrayList<Future<?>> futures = new 
ArrayList<Future<?>>(threadCount);
                for (int i = 0; i < threadCount; i++) {
                        futures.add(submissionSvc.submit(new Runnable() {
                                public void run() {
                                        try {
                                                insertAsBatch(client, 0, 9);
                                        } catch (Exception e) {
                                                e.printStackTrace(); // To 
change body of catch
                                                                                
                // statement use
                                                // File | Settings | File 
Templates.
                                        }
                                }
                        }));
                }
                for (int i = 0; i < threadCount; i++) {
                        futures.get(i).get();
                }
{code}

Although insertAsBatch is submit to a Thread pool, but the true insert occur 
when futures.get(i).get(); 

this is not a synchronized action,so the insert is a synchronized too;

when changing to 
{code}
submissionSvc.submit(new Runnable() {
                                public void run() {
                                        try {
                                                insertAsBatch(client, 0, 9);
                                        } catch (Exception e) {
                                                e.printStackTrace(); 
                                        }
                                }
                        }).get()
{code}
Exception occur.
May be a real test is like this

{code}
package thrift;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.tomcat.jni.Thread;
import org.junit.internal.runners.statements.RunAfters;

public class Client {

        static AtomicInteger i = new AtomicInteger(0);

        static ThreadLocal<ThriftSourceProtocol.Client> l = new ThreadLocal<>();

        public static void main(String[] args) throws TException {
                ExecutorService submissionSvc = 
Executors.newFixedThreadPool(10);
                Map<String, String> map = new HashMap<>();
                for (int j = 0; j < 1000; j++) {
                        submissionSvc.submit(new Runnable() {
                                @Override
                                public void run() {
                                        if (l.get() == null) {
                                                try {
                                                        l.set(create());
                                                } catch (TTransportException e) 
{
                                                        e.printStackTrace();
                                                }
                                        }
                                        try {
                                                int k = i.incrementAndGet();
                                                Status s = l.get().append(new 
ThriftFlumeEvent(map,
                                                                
ByteBuffer.wrap(String.valueOf(java.lang.Thread.currentThread().getName()).getBytes())));
                                                System.out.println(k+ "****" + 
java.lang.Thread.currentThread().getName());
                                        } catch (Exception e) {
                                                l.remove();
                                                e.printStackTrace();
                                        }
                                }

                                private thrift.ThriftSourceProtocol.Client 
create() throws TTransportException {
                                        TSocket tt = new TSocket("172.17.0.12", 
4444);
                                        TTransport transport = new 
TFastFramedTransport(tt);
                                        ThriftSourceProtocol.Client client = 
new ThriftSourceProtocol.Client(
                                                        new 
TCompactProtocol(transport));
                                        transport.open();
                                        return client;
                                }
                        });
                }
        }
}
{code}

we must make sure each thread has it's own connection,so use threadlocal



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to