[ 
https://issues.apache.org/jira/browse/STORM-2231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144262#comment-16144262
 ] 

Kevin Conaway commented on STORM-2231:
--------------------------------------

[~kabhwan] btw I can confirm that your patch fixes the issue with this test 
case that I wrote:

Given that the underlying issue is a race condition, you may need to run it a 
few times but the error always shows up for me.

Per my comment above, attempting to _synchronize_ on the outputcollector also 
doesn't work when the disruptor-queue producer type is single threaded.  I'm 
not sure what the workaround should be without your patch in place

{code:java}
package org.apache.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.AckFailMapTracker;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.TestJob;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Storm2231 {

    @Test
    public void runTest() {
        Config daemonConfig = new Config();
        daemonConfig.put(Config.STORM_LOCAL_HOSTNAME, "localhost");

        MkClusterParam clusterParams = new MkClusterParam();
        clusterParams.setDaemonConf(daemonConfig);

        Testing.withTrackedCluster(clusterParams, new TestJob() {
            @Override
            public void run(ILocalCluster cluster) throws Exception {
                AckFailMapTracker tracker = new AckFailMapTracker();

                final FeederSpout spout = new FeederSpout(new 
Fields("message"));
                spout.setAckFailDelegate(tracker);

                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout("spout", spout, 50);
                builder.setBolt("bolt", new ExampleBolt(), 
10).shuffleGrouping("spout");

                Config topologyConfig = new Config();
                topologyConfig.setDebug(false);

                TrackedTopology tracked = Testing.mkTrackedTopology(cluster, 
builder.createTopology());
                cluster.submitTopology(String.valueOf(System.nanoTime()), 
topologyConfig, tracked.getTopology());

                ExecutorService service = Executors.newCachedThreadPool();

                for (int i=0; i < 20; i++) {
                    service.submit(new Runnable() {
                        @Override
                        public void run() {
                            for (int j=0; j <= 500_000; j++) {
                                String messageId = 
Thread.currentThread().getId() + "-" + UUID.randomUUID().toString();
                                synchronized (spout) {
                                    spout.feed(new Values(messageId), 
messageId);
                                }
                            }
                        }
                    });
                }

                service.shutdown();
                service.awaitTermination(1, TimeUnit.DAYS);
            }
        });
    }

    public static class ExampleBolt extends BaseRichBolt {

        private transient ExecutorService executorService;
        private transient BlockingQueue<Runnable> callbackQueue;
        private transient OutputCollector outputCollector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
            outputCollector = collector;
            callbackQueue = new LinkedBlockingQueue<>();
            executorService = Executors.newCachedThreadPool();
            for (int i =0; i < 5; i++) {
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        while (!Thread.currentThread().isInterrupted()) {
                            List<Runnable> callbacks = new ArrayList<>();
                            callbackQueue.drainTo(callbacks);
                            for (Runnable callback : callbacks) {
                                callback.run();
                            }
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException ex) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                });
            }
        }

        @Override
        public void execute(final Tuple input) {
            try {
                callbackQueue.put(new Runnable() {
                    @Override
                    public void run() {
                        outputCollector.ack(input);
                    }
                });
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                outputCollector.fail(input);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

        @Override
        public void cleanup() {
            executorService.shutdownNow();
        }
    }
}
{code}

> NULL in DisruptorQueue while multi-threaded ack
> -----------------------------------------------
>
>                 Key: STORM-2231
>                 URL: https://issues.apache.org/jira/browse/STORM-2231
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 1.0.1, 1.1.0
>            Reporter: Alexander Kharitonov
>            Assignee: Jungtaek Lim
>            Priority: Critical
>             Fix For: 2.0.0, 1.2.0, 1.1.2, 1.0.5
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I use simple topology with one spout (9 workers) and one bolt (9 workers).
> I have topology.backpressure.enable: false in storm.yaml.
> Spouts send about 10 000 000 tuples in 10 minutes. Pending for spout is 80 
> 000.
> Bolts buffer theirs tuples for 60 seconds and flush to database and ack 
> tuples in parallel (10 threads).
> I read that OutputCollector can be used in many threads safely, so i use it.
> I don't have any bottleneck in bolts(flushing to database) or spouts(kafka 
> spout), but about 2% of tuples fail due to tuple processing timeout (fails 
> are recordered in spout stats only).
> I am sure that bolts ack all tuples. But some of acks don't come to spouts.
> While multi-threaded acking i see many errors in worker logs like that:
> 2016-12-01 13:21:10.741 o.a.s.u.DisruptorQueue [ERROR] NULL found in 
> disruptor-executor[3 3]-send-queue:853877
> I tried to use synchronized wrapper around OutputCollector to fix the error. 
> But it didn't help.
> I found the workaround that helps me: i do all processing in bolt in multiple 
> threads but call OutputCollector.ack methods in a one single separate thread.
> I think Storm has an error in the multi-threaded use of OutputCollector.
> If my topology has much less load, like 500 000 tuples per 10 minutes, then  
> i don't lose any acks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to