Updated Branches:
  refs/heads/master 300d27edf -> 7fca871c1

[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5a4caff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5a4caff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5a4caff

Branch: refs/heads/master
Commit: d5a4caffdf6ed267237309cac9f70fb7ebc14be1
Parents: 180e5b4
Author: Kanak Biscuitwala <[email protected]>
Authored: Wed Jan 8 17:18:35 2014 -0800
Committer: Kanak Biscuitwala <[email protected]>
Committed: Wed Jan 22 14:45:22 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  57 +++++++--
 .../stages/ClusterEventBlockingQueue.java       | 123 +++++++++++++++++++
 .../stages/TestClusterEventBlockingQueue.java   |  95 ++++++++++++++
 .../helix/integration/TestHelixConnection.java  |   1 +
 .../helix/integration/TestSchedulerMessage.java |  12 +-
 5 files changed, 274 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9fef2da..7b19574 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
@@ -46,6 +47,7 @@ import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
@@ -56,8 +58,8 @@ import 
org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.controller.stages.PersistContextStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -96,6 +98,12 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
   ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
+   * A queue for controller events and a thread that will consume it
+   */
+  private final ClusterEventBlockingQueue _eventQueue;
+  private final ClusterEventProcessor _eventThread;
+
+  /**
    * The _paused flag is checked by function handleEvent(), while if the flag 
is set
    * handleEvent() will be no-op. Other event handling logic keeps the same 
when the flag
    * is set.
@@ -135,7 +143,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
       List<ZNRecord> dummy = new ArrayList<ZNRecord>();
       event.addAttribute("eventData", dummy);
       // Should be able to process
-      handleEvent(event);
+      _eventQueue.put(event);
     }
   }
 
@@ -217,6 +225,9 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
     _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+    _eventQueue = new ClusterEventBlockingQueue();
+    _eventThread = new ClusterEventProcessor();
+    _eventThread.start();
   }
 
   /**
@@ -273,6 +284,8 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
       return;
     }
 
+    logger.info("START: Invoking controller pipeline for event: " + 
event.getName());
+    long startTime = System.currentTimeMillis();
     for (Pipeline pipeline : pipelines) {
       try {
         pipeline.handle(event);
@@ -283,6 +296,9 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
         break;
       }
     }
+    long endTime = System.currentTimeMillis();
+    logger.info("END: Invoking controller pipeline for event: " + 
event.getName() + ", took "
+        + (endTime - startTime) + " ms");
   }
 
   // TODO since we read data in pipeline, we can get rid of reading from 
zookeeper in
@@ -296,7 +312,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     // event.addAttribute("helixmanager", changeContext.getManager());
     // event.addAttribute("changeContext", changeContext);
     // event.addAttribute("eventData", externalViewList);
-    // // handleEvent(event);
+    // _eventQueue.put(event);
     // logger.info("END: GenericClusterController.onExternalViewChange()");
   }
 
@@ -309,7 +325,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", statesInfo);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onStateChange()");
   }
 
@@ -333,7 +349,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", messages);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (_clusterStatusMonitor != null && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
@@ -367,7 +383,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", liveInstances);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: Generic 
GenericClusterController.onLiveInstanceChange()");
   }
 
@@ -393,13 +409,13 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", idealStates);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (changeContext.getType() != Type.FINALIZE) {
       checkRebalancingTimer(changeContext.getManager(), idealStates);
     }
 
-    logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+    logger.info("END: GenericClusterController.onIdealStateChange()");
   }
 
   @Override
@@ -409,7 +425,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("eventData", configs);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onConfigChange()");
   }
 
@@ -452,7 +468,7 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
         event.addAttribute("changeContext", changeContext);
         event.addAttribute("helixmanager", changeContext.getManager());
         event.addAttribute("eventData", pauseSignal);
-        handleEvent(event);
+        _eventQueue.put(event);
       } else {
         _paused = false;
       }
@@ -534,4 +550,25 @@ public class GenericHelixController implements 
ConfigChangeListener, IdealStateC
 
   }
 
+  private class ClusterEventProcessor extends Thread {
+    @Override
+    public void run() {
+      logger.info("START ClusterEventProcessor thread");
+      while (!isInterrupted()) {
+        try {
+          ClusterEvent event = _eventQueue.take();
+          handleEvent(event);
+        } catch (InterruptedException e) {
+          logger.warn("ClusterEventProcessor interrupted", e);
+          interrupt();
+        } catch (ZkInterruptedException e) {
+          logger.warn("ClusterEventProcessor caught a ZK connection 
interrupt", e);
+          interrupt();
+        } catch (Throwable t) {
+          logger.error("ClusterEventProcessor failed while running the 
controller pipeline", t);
+        }
+      }
+      logger.info("END ClusterEventProcessor thread");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
new file mode 100644
index 0000000..54c9ca2
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A blocking queue of ClusterEvent objects to be used by the controller 
pipeline. This prevents
+ * multiple events of the same type from flooding the controller and 
preventing progress from being
+ * made. This queue has no capacity. This class is meant to be a limited 
implementation of the
+ * {@link BlockingQueue} interface.
+ */
+public class ClusterEventBlockingQueue {
+  private static final Logger LOG = 
Logger.getLogger(ClusterEventBlockingQueue.class);
+  private final Map<String, ClusterEvent> _eventMap;
+  private final Queue<ClusterEvent> _eventQueue;
+
+  /**
+   * Instantiate the queue
+   */
+  public ClusterEventBlockingQueue() {
+    _eventMap = Maps.newHashMap();
+    _eventQueue = Lists.newLinkedList();
+  }
+
+  /**
+   * Remove all events from the queue
+   */
+  public synchronized void clear() {
+    _eventMap.clear();
+    _eventQueue.clear();
+  }
+
+  /**
+   * Add a single event to the queue, overwriting events with the same name
+   * @param event ClusterEvent event to add
+   */
+  public synchronized void put(ClusterEvent event) {
+    if (!_eventMap.containsKey(event.getName())) {
+      // only insert if there isn't a same-named event already present
+      boolean result = _eventQueue.offer(event);
+      if (!result) {
+        return;
+      }
+    }
+    // always overwrite in case this is a FINALIZE
+    _eventMap.put(event.getName(), event);
+    LOG.debug("Putting event " + event.getName());
+    LOG.debug("Event queue size: " + _eventQueue.size());
+    notify();
+  }
+
+  /**
+   * Remove an element from the front of the queue, blocking if none is 
available. This method
+   * will return the most recent event seen with the oldest enqueued event 
name.
+   * @return ClusterEvent at the front of the queue
+   * @throws InterruptedException if the wait for elements was interrupted
+   */
+  public synchronized ClusterEvent take() throws InterruptedException {
+    while (_eventQueue.isEmpty()) {
+      wait();
+    }
+    ClusterEvent queuedEvent = _eventQueue.poll();
+    if (queuedEvent != null) {
+      LOG.debug("Taking event " + queuedEvent.getName());
+      LOG.debug("Event queue size: " + _eventQueue.size());
+      return _eventMap.remove(queuedEvent.getName());
+    }
+    return null;
+  }
+
+  /**
+   * Get at the head of the queue without removing it
+   * @return ClusterEvent at the front of the queue, or null if none available
+   */
+  public synchronized ClusterEvent peek() {
+    ClusterEvent queuedEvent = _eventQueue.peek();
+    if (queuedEvent != null) {
+      return _eventMap.get(queuedEvent.getName());
+    }
+    return queuedEvent;
+  }
+
+  /**
+   * Get the queue size
+   * @return integer size of the queue
+   */
+  public int size() {
+    return _eventQueue.size();
+  }
+
+  /**
+   * Check if the queue is empty
+   * @return true if events are not present, false otherwise
+   */
+  public boolean isEmpty() {
+    return _eventQueue.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
new file mode 100644
index 0000000..2dba7b6
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events 
while keeping then in
+ * FIFO order.
+ */
+public class TestClusterEventBlockingQueue {
+  @Test
+  public void testEventQueue() throws Exception {
+    // initialize the queue
+    ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
+
+    // add an event
+    ClusterEvent event1 = new ClusterEvent("event1");
+    queue.put(event1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // add an event with a different name
+    ClusterEvent event2 = new ClusterEvent("event2");
+    queue.put(event2);
+    Assert.assertEquals(queue.size(), 2);
+
+    // add an event with the same name as event1 (should not change queue size)
+    ClusterEvent newEvent1 = new ClusterEvent("event1");
+    newEvent1.addAttribute("attr", 1);
+    queue.put(newEvent1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test peek
+    ClusterEvent peeked = queue.peek();
+    Assert.assertEquals(peeked.getName(), "event1");
+    Assert.assertEquals(peeked.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test take the head
+    ListeningExecutorService service =
+        MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+    ClusterEvent takenEvent1 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent1.getName(), "event1");
+    Assert.assertEquals(takenEvent1.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // test take the tail
+    ClusterEvent takenEvent2 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent2.getName(), "event2");
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  private ClusterEvent safeTake(final ClusterEventBlockingQueue queue,
+      final ListeningExecutorService service) throws InterruptedException, 
ExecutionException,
+      TimeoutException {
+    // the take() in ClusterEventBlockingQueue will wait indefinitely
+    // for this test, stop waiting after 30 seconds
+    ListenableFuture<ClusterEvent> future = service.submit(new 
Callable<ClusterEvent>() {
+      @Override
+      public ClusterEvent call() throws InterruptedException {
+        return queue.take();
+      }
+    });
+    ClusterEvent event = future.get(30, TimeUnit.SECONDS);
+    return event;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index b415393..318ab66 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -142,6 +142,7 @@ public class TestHelixConnection extends ZkUnitTestBase {
         StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
 
     participant.startAsync();
+    Thread.sleep(1000);
 
     // verify
     final HelixDataAccessor accessor = 
connection.createDataAccessor(clusterId);

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 30f5807..6066859 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -38,10 +38,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -196,6 +196,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
   public void testSchedulerMsgUsingQueue() throws Exception {
     Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -345,6 +346,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
 
     int messageResultCount = 0;
     for (int i = 0; i < 10; i++) {
+      Thread.sleep(1000);
       ZNRecord statusUpdate = 
helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
           .equals("" + (_PARTITIONS * 3)));
@@ -422,6 +424,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void testSchedulerMsg2() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -583,6 +586,7 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void testSchedulerMsg3() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -690,7 +694,6 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
       }
       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
 
-
       boolean success = false;
       for (int j = 0; j < 6; j++) {
         int count = 0;
@@ -763,7 +766,8 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
     String crString = sw.toString();
 
     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", 
msgTemplate.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msgTemplate.getRecord().getSimpleFields());
     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
 
@@ -1029,4 +1033,4 @@ public class TestSchedulerMessage extends 
ZkStandAloneCMTestBaseWithPropertyServ
         ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
 
   }
-}
\ No newline at end of file
+}

Reply via email to