Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1417#discussion_r79412490 --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java --- @@ -82,31 +86,73 @@ public void close() { private static class LocalClient implements IConnection { private final LocalServer _server; + //Messages sent before the server registered a callback + private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer; + private final ScheduledExecutorService _pendingFlusher; public LocalClient(LocalServer server) { _server = server; + _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>(); + _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){ + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName("LocalClientFlusher-" + thread.getId()); + thread.setDaemon(true); + return thread; + } + }); + _pendingFlusher.scheduleAtFixedRate(new Runnable(){ + @Override + public void run(){ + try { + //Ensure messages are flushed even if no more sends are performed + flushPending(); + } catch (Throwable t) { + LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t); + throw t; + } + } + }, 5, 5, TimeUnit.SECONDS); } @Override public void registerRecv(IConnectionCallback cb) { throw new IllegalArgumentException("SHOULD NOT HAPPEN"); } - + + private void flushPending(){ + if (_server._cb != null && !_pendingDueToUnregisteredServer.isEmpty()) { + ArrayList<TaskMessage> ret = new ArrayList<>(); + _pendingDueToUnregisteredServer.drainTo(ret); + _server._cb.recv(ret); + } + } + @Override public void send(int taskId, byte[] payload) { + TaskMessage message = new TaskMessage(taskId, payload); if (_server._cb != null) { - _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload))); + flushPending(); + _server._cb.recv(Arrays.asList(message)); + } else { + _pendingDueToUnregisteredServer.add(message); } } @Override public void send(Iterator<TaskMessage> msgs) { if (_server._cb != null) { + flushPending(); ArrayList<TaskMessage> ret = new ArrayList<>(); while (msgs.hasNext()) { ret.add(msgs.next()); } _server._cb.recv(ret); --- End diff -- cache _cb before using it.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---