Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1417#discussion_r79412490
  
    --- Diff: storm-core/src/jvm/org/apache/storm/messaging/local/Context.java 
---
    @@ -82,31 +86,73 @@ public void close() {
     
         private static class LocalClient implements IConnection {
             private final LocalServer _server;
    +        //Messages sent before the server registered a callback
    +        private final LinkedBlockingQueue<TaskMessage> 
_pendingDueToUnregisteredServer;
    +        private final ScheduledExecutorService _pendingFlusher;
     
             public LocalClient(LocalServer server) {
                 _server = server;
    +            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
    +            _pendingFlusher = Executors.newScheduledThreadPool(1, new 
ThreadFactory(){
    +                @Override
    +                public Thread newThread(Runnable runnable) {
    +                    Thread thread = new Thread(runnable);
    +                    thread.setName("LocalClientFlusher-" + thread.getId());
    +                    thread.setDaemon(true);
    +                    return thread;
    +                }
    +            });
    +            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
    +                @Override
    +                public void run(){
    +                    try {
    +                        //Ensure messages are flushed even if no more 
sends are performed
    +                        flushPending();
    +                    } catch (Throwable t) {
    +                        LOG.error("Uncaught throwable in pending message 
flusher thread, messages may be lost", t);
    +                        throw t;
    +                    }
    +                }
    +            }, 5, 5, TimeUnit.SECONDS);
             }
     
             @Override
             public void registerRecv(IConnectionCallback cb) {
                 throw new IllegalArgumentException("SHOULD NOT HAPPEN");
             }
    -
    +        
    +        private void flushPending(){
    +            if (_server._cb != null && 
!_pendingDueToUnregisteredServer.isEmpty()) {
    +                ArrayList<TaskMessage> ret = new ArrayList<>();
    +                _pendingDueToUnregisteredServer.drainTo(ret);
    +                _server._cb.recv(ret);
    +            }
    +        }
    +        
             @Override
             public void send(int taskId,  byte[] payload) {
    +            TaskMessage message = new TaskMessage(taskId, payload);
                 if (_server._cb != null) {
    -                _server._cb.recv(Arrays.asList(new TaskMessage(taskId, 
payload)));
    +                flushPending();
    +                _server._cb.recv(Arrays.asList(message));
    +            } else {
    +                _pendingDueToUnregisteredServer.add(message);
                 }
             }
      
             @Override
             public void send(Iterator<TaskMessage> msgs) {
                 if (_server._cb != null) {
    +                flushPending();
                     ArrayList<TaskMessage> ret = new ArrayList<>();
                     while (msgs.hasNext()) {
                         ret.add(msgs.next());
                     }
                     _server._cb.recv(ret);
    --- End diff --
    
    cache _cb before using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to