[
https://issues.apache.org/jira/browse/STORM-2231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144262#comment-16144262
]
Kevin Conaway commented on STORM-2231:
--------------------------------------
[~kabhwan] btw I can confirm that your patch fixes the issue with this test
case that I wrote:
Given that the underlying issue is a race condition, you may need to run it a
few times but the error always shows up for me.
Per my comment above, attempting to _synchronize_ on the outputcollector also
doesn't work when the disruptor-queue producer type is single threaded. I'm
not sure what the workaround should be without your patch in place
{code:java}
package org.apache.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.AckFailMapTracker;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.TestJob;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Storm2231 {
@Test
public void runTest() {
Config daemonConfig = new Config();
daemonConfig.put(Config.STORM_LOCAL_HOSTNAME, "localhost");
MkClusterParam clusterParams = new MkClusterParam();
clusterParams.setDaemonConf(daemonConfig);
Testing.withTrackedCluster(clusterParams, new TestJob() {
@Override
public void run(ILocalCluster cluster) throws Exception {
AckFailMapTracker tracker = new AckFailMapTracker();
final FeederSpout spout = new FeederSpout(new
Fields("message"));
spout.setAckFailDelegate(tracker);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 50);
builder.setBolt("bolt", new ExampleBolt(),
10).shuffleGrouping("spout");
Config topologyConfig = new Config();
topologyConfig.setDebug(false);
TrackedTopology tracked = Testing.mkTrackedTopology(cluster,
builder.createTopology());
cluster.submitTopology(String.valueOf(System.nanoTime()),
topologyConfig, tracked.getTopology());
ExecutorService service = Executors.newCachedThreadPool();
for (int i=0; i < 20; i++) {
service.submit(new Runnable() {
@Override
public void run() {
for (int j=0; j <= 500_000; j++) {
String messageId =
Thread.currentThread().getId() + "-" + UUID.randomUUID().toString();
synchronized (spout) {
spout.feed(new Values(messageId),
messageId);
}
}
}
});
}
service.shutdown();
service.awaitTermination(1, TimeUnit.DAYS);
}
});
}
public static class ExampleBolt extends BaseRichBolt {
private transient ExecutorService executorService;
private transient BlockingQueue<Runnable> callbackQueue;
private transient OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
outputCollector = collector;
callbackQueue = new LinkedBlockingQueue<>();
executorService = Executors.newCachedThreadPool();
for (int i =0; i < 5; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
List<Runnable> callbacks = new ArrayList<>();
callbackQueue.drainTo(callbacks);
for (Runnable callback : callbacks) {
callback.run();
}
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
});
}
}
@Override
public void execute(final Tuple input) {
try {
callbackQueue.put(new Runnable() {
@Override
public void run() {
outputCollector.ack(input);
}
});
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
outputCollector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void cleanup() {
executorService.shutdownNow();
}
}
}
{code}
> NULL in DisruptorQueue while multi-threaded ack
> -----------------------------------------------
>
> Key: STORM-2231
> URL: https://issues.apache.org/jira/browse/STORM-2231
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core
> Affects Versions: 1.0.1, 1.1.0
> Reporter: Alexander Kharitonov
> Assignee: Jungtaek Lim
> Priority: Critical
> Fix For: 2.0.0, 1.2.0, 1.1.2, 1.0.5
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> I use simple topology with one spout (9 workers) and one bolt (9 workers).
> I have topology.backpressure.enable: false in storm.yaml.
> Spouts send about 10 000 000 tuples in 10 minutes. Pending for spout is 80
> 000.
> Bolts buffer theirs tuples for 60 seconds and flush to database and ack
> tuples in parallel (10 threads).
> I read that OutputCollector can be used in many threads safely, so i use it.
> I don't have any bottleneck in bolts(flushing to database) or spouts(kafka
> spout), but about 2% of tuples fail due to tuple processing timeout (fails
> are recordered in spout stats only).
> I am sure that bolts ack all tuples. But some of acks don't come to spouts.
> While multi-threaded acking i see many errors in worker logs like that:
> 2016-12-01 13:21:10.741 o.a.s.u.DisruptorQueue [ERROR] NULL found in
> disruptor-executor[3 3]-send-queue:853877
> I tried to use synchronized wrapper around OutputCollector to fix the error.
> But it didn't help.
> I found the workaround that helps me: i do all processing in bolt in multiple
> threads but call OutputCollector.ack methods in a one single separate thread.
> I think Storm has an error in the multi-threaded use of OutputCollector.
> If my topology has much less load, like 500 000 tuples per 10 minutes, then
> i don't lose any acks.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)