Updated Branches: refs/heads/helix-0.6.2-release f678f79f3 -> c92428023
[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9242802 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9242802 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9242802 Branch: refs/heads/helix-0.6.2-release Commit: c92428023a6b8456c0e0ecce0649e61ea2575863 Parents: f678f79 Author: Kanak Biscuitwala <[email protected]> Authored: Wed Jan 8 17:18:35 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Jan 22 15:18:02 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 67 +++++++--- .../stages/ClusterEventBlockingQueue.java | 123 +++++++++++++++++++ .../stages/TestClusterEventBlockingQueue.java | 95 ++++++++++++++ .../helix/integration/TestSchedulerMessage.java | 6 +- 4 files changed, 276 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 3b522e6..7e28399 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -28,6 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicReference; +import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.ConfigChangeListener; import org.apache.helix.ControllerChangeListener; import org.apache.helix.CurrentStateChangeListener; @@ -39,13 +40,14 @@ import org.apache.helix.IdealStateChangeListener; import org.apache.helix.LiveInstanceChangeListener; import org.apache.helix.MessageListener; import org.apache.helix.NotificationContext; -import org.apache.helix.ZNRecord; import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.ZNRecord; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.controller.stages.ClusterEventBlockingQueue; import org.apache.helix.controller.stages.CompatibilityCheckStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ExternalViewComputeStage; @@ -55,8 +57,8 @@ import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.controller.stages.RebalanceIdealStateStage; import org.apache.helix.controller.stages.ResourceComputationStage; -import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.controller.stages.ResourceValidationStage; +import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HealthStat; @@ -95,6 +97,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC ClusterStatusMonitor _clusterStatusMonitor; /** + * A queue for controller events and a thread that will consume it + */ + private final ClusterEventBlockingQueue _eventQueue; + private final ClusterEventProcessor _eventThread; + + /** * The _paused flag is checked by function handleEvent(), while if the flag is set * handleEvent() will be no-op. Other event handling logic keeps the same when the flag * is set. @@ -134,7 +142,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC List<ZNRecord> dummy = new ArrayList<ZNRecord>(); event.addAttribute("eventData", dummy); // Should be able to process - handleEvent(event); + _eventQueue.put(event); } } @@ -226,6 +234,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _registry = registry; _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>(); _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>(); + _eventQueue = new ClusterEventBlockingQueue(); + _eventThread = new ClusterEventProcessor(); + _eventThread.start(); } /** @@ -277,6 +288,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC return; } + logger.info("START: Invoking controller pipeline for event: " + event.getName()); + long startTime = System.currentTimeMillis(); for (Pipeline pipeline : pipelines) { try { pipeline.handle(event); @@ -287,6 +300,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC break; } } + long endTime = System.currentTimeMillis(); + logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took " + + (endTime - startTime) + " ms"); } // TODO since we read data in pipeline, we can get rid of reading from zookeeper in @@ -300,7 +316,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC // event.addAttribute("helixmanager", changeContext.getManager()); // event.addAttribute("changeContext", changeContext); // event.addAttribute("eventData", externalViewList); - // // handleEvent(event); + // _eventQueue.put(event); // logger.info("END: GenericClusterController.onExternalViewChange()"); } @@ -313,7 +329,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("instanceName", instanceName); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", statesInfo); - handleEvent(event); + _eventQueue.put(event); logger.info("END: GenericClusterController.onStateChange()"); } @@ -337,7 +353,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("instanceName", instanceName); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", messages); - handleEvent(event); + _eventQueue.put(event); if (_clusterStatusMonitor != null && messages != null) { _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size()); @@ -371,7 +387,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", liveInstances); - handleEvent(event); + _eventQueue.put(event); logger.info("END: Generic GenericClusterController.onLiveInstanceChange()"); } @@ -397,13 +413,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", idealStates); - handleEvent(event); + _eventQueue.put(event); if (changeContext.getType() != Type.FINALIZE) { checkRebalancingTimer(changeContext.getManager(), idealStates); } - logger.info("END: Generic GenericClusterController.onIdealStateChange()"); + logger.info("END: GenericClusterController.onIdealStateChange()"); } @Override @@ -413,7 +429,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("changeContext", changeContext); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("eventData", configs); - handleEvent(event); + _eventQueue.put(event); logger.info("END: GenericClusterController.onConfigChange()"); } @@ -456,7 +472,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("changeContext", changeContext); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("eventData", pauseSignal); - handleEvent(event); + _eventQueue.put(event); } else { _paused = false; } @@ -537,11 +553,34 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _lastSeenSessions.set(curSessions); } + public void shutdownClusterStatusMonitor(String clusterName) { - if (_clusterStatusMonitor != null) { + if (_clusterStatusMonitor != null) { logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName); - _clusterStatusMonitor.reset(); - _clusterStatusMonitor = null; + _clusterStatusMonitor.reset(); + _clusterStatusMonitor = null; + } + } + + private class ClusterEventProcessor extends Thread { + @Override + public void run() { + logger.info("START ClusterEventProcessor thread"); + while (!isInterrupted()) { + try { + ClusterEvent event = _eventQueue.take(); + handleEvent(event); + } catch (InterruptedException e) { + logger.warn("ClusterEventProcessor interrupted", e); + interrupt(); + } catch (ZkInterruptedException e) { + logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e); + interrupt(); + } catch (Throwable t) { + logger.error("ClusterEventProcessor failed while running the controller pipeline", t); + } + } + logger.info("END ClusterEventProcessor thread"); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java new file mode 100644 index 0000000..54c9ca2 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java @@ -0,0 +1,123 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; + +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents + * multiple events of the same type from flooding the controller and preventing progress from being + * made. This queue has no capacity. This class is meant to be a limited implementation of the + * {@link BlockingQueue} interface. + */ +public class ClusterEventBlockingQueue { + private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class); + private final Map<String, ClusterEvent> _eventMap; + private final Queue<ClusterEvent> _eventQueue; + + /** + * Instantiate the queue + */ + public ClusterEventBlockingQueue() { + _eventMap = Maps.newHashMap(); + _eventQueue = Lists.newLinkedList(); + } + + /** + * Remove all events from the queue + */ + public synchronized void clear() { + _eventMap.clear(); + _eventQueue.clear(); + } + + /** + * Add a single event to the queue, overwriting events with the same name + * @param event ClusterEvent event to add + */ + public synchronized void put(ClusterEvent event) { + if (!_eventMap.containsKey(event.getName())) { + // only insert if there isn't a same-named event already present + boolean result = _eventQueue.offer(event); + if (!result) { + return; + } + } + // always overwrite in case this is a FINALIZE + _eventMap.put(event.getName(), event); + LOG.debug("Putting event " + event.getName()); + LOG.debug("Event queue size: " + _eventQueue.size()); + notify(); + } + + /** + * Remove an element from the front of the queue, blocking if none is available. This method + * will return the most recent event seen with the oldest enqueued event name. + * @return ClusterEvent at the front of the queue + * @throws InterruptedException if the wait for elements was interrupted + */ + public synchronized ClusterEvent take() throws InterruptedException { + while (_eventQueue.isEmpty()) { + wait(); + } + ClusterEvent queuedEvent = _eventQueue.poll(); + if (queuedEvent != null) { + LOG.debug("Taking event " + queuedEvent.getName()); + LOG.debug("Event queue size: " + _eventQueue.size()); + return _eventMap.remove(queuedEvent.getName()); + } + return null; + } + + /** + * Get at the head of the queue without removing it + * @return ClusterEvent at the front of the queue, or null if none available + */ + public synchronized ClusterEvent peek() { + ClusterEvent queuedEvent = _eventQueue.peek(); + if (queuedEvent != null) { + return _eventMap.get(queuedEvent.getName()); + } + return queuedEvent; + } + + /** + * Get the queue size + * @return integer size of the queue + */ + public int size() { + return _eventQueue.size(); + } + + /** + * Check if the queue is empty + * @return true if events are not present, false otherwise + */ + public boolean isEmpty() { + return _eventQueue.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java new file mode 100644 index 0000000..2dba7b6 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java @@ -0,0 +1,95 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in + * FIFO order. + */ +public class TestClusterEventBlockingQueue { + @Test + public void testEventQueue() throws Exception { + // initialize the queue + ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue(); + + // add an event + ClusterEvent event1 = new ClusterEvent("event1"); + queue.put(event1); + Assert.assertEquals(queue.size(), 1); + + // add an event with a different name + ClusterEvent event2 = new ClusterEvent("event2"); + queue.put(event2); + Assert.assertEquals(queue.size(), 2); + + // add an event with the same name as event1 (should not change queue size) + ClusterEvent newEvent1 = new ClusterEvent("event1"); + newEvent1.addAttribute("attr", 1); + queue.put(newEvent1); + Assert.assertEquals(queue.size(), 2); + + // test peek + ClusterEvent peeked = queue.peek(); + Assert.assertEquals(peeked.getName(), "event1"); + Assert.assertEquals(peeked.getAttribute("attr"), 1); + Assert.assertEquals(queue.size(), 2); + + // test take the head + ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + ClusterEvent takenEvent1 = safeTake(queue, service); + Assert.assertEquals(takenEvent1.getName(), "event1"); + Assert.assertEquals(takenEvent1.getAttribute("attr"), 1); + Assert.assertEquals(queue.size(), 1); + + // test take the tail + ClusterEvent takenEvent2 = safeTake(queue, service); + Assert.assertEquals(takenEvent2.getName(), "event2"); + Assert.assertEquals(queue.size(), 0); + } + + private ClusterEvent safeTake(final ClusterEventBlockingQueue queue, + final ListeningExecutorService service) throws InterruptedException, ExecutionException, + TimeoutException { + // the take() in ClusterEventBlockingQueue will wait indefinitely + // for this test, stop waiting after 30 seconds + ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() { + @Override + public ClusterEvent call() throws InterruptedException { + return queue.take(); + } + }); + ClusterEvent event = future.get(30, TimeUnit.SECONDS); + return event; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index bf851cc..49fd98c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -37,9 +37,9 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; import org.apache.helix.messaging.AsyncCallback; import org.apache.helix.messaging.handling.HelixTaskResult; @@ -192,6 +192,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ public void TestSchedulerMsgUsingQueue() throws Exception { Logger.getRootLogger().setLevel(Level.INFO); _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( @@ -341,6 +342,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ int messageResultCount = 0; for (int i = 0; i < 10; i++) { + Thread.sleep(1000); ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") .equals("" + (_PARTITIONS * 3))); @@ -418,6 +420,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ @Test() public void TestSchedulerMsg2() throws Exception { _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( @@ -579,6 +582,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ @Test() public void TestSchedulerMsg3() throws Exception { _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory(
