Updated Branches: refs/heads/master 300d27edf -> 7fca871c1
[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5a4caff Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5a4caff Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5a4caff Branch: refs/heads/master Commit: d5a4caffdf6ed267237309cac9f70fb7ebc14be1 Parents: 180e5b4 Author: Kanak Biscuitwala <[email protected]> Authored: Wed Jan 8 17:18:35 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Jan 22 14:45:22 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 57 +++++++-- .../stages/ClusterEventBlockingQueue.java | 123 +++++++++++++++++++ .../stages/TestClusterEventBlockingQueue.java | 95 ++++++++++++++ .../helix/integration/TestHelixConnection.java | 1 + .../helix/integration/TestSchedulerMessage.java | 12 +- 5 files changed, 274 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 9fef2da..7b19574 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -28,6 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicReference; +import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.ConfigChangeListener; import org.apache.helix.ControllerChangeListener; import org.apache.helix.CurrentStateChangeListener; @@ -46,6 +47,7 @@ import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.controller.stages.ClusterEventBlockingQueue; import org.apache.helix.controller.stages.CompatibilityCheckStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ExternalViewComputeStage; @@ -56,8 +58,8 @@ import org.apache.helix.controller.stages.PersistAssignmentStage; import org.apache.helix.controller.stages.PersistContextStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.controller.stages.ResourceComputationStage; -import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.controller.stages.ResourceValidationStage; +import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HealthStat; @@ -96,6 +98,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC ClusterStatusMonitor _clusterStatusMonitor; /** + * A queue for controller events and a thread that will consume it + */ + private final ClusterEventBlockingQueue _eventQueue; + private final ClusterEventProcessor _eventThread; + + /** * The _paused flag is checked by function handleEvent(), while if the flag is set * handleEvent() will be no-op. Other event handling logic keeps the same when the flag * is set. @@ -135,7 +143,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC List<ZNRecord> dummy = new ArrayList<ZNRecord>(); event.addAttribute("eventData", dummy); // Should be able to process - handleEvent(event); + _eventQueue.put(event); } } @@ -217,6 +225,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _registry = registry; _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>(); _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>(); + _eventQueue = new ClusterEventBlockingQueue(); + _eventThread = new ClusterEventProcessor(); + _eventThread.start(); } /** @@ -273,6 +284,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC return; } + logger.info("START: Invoking controller pipeline for event: " + event.getName()); + long startTime = System.currentTimeMillis(); for (Pipeline pipeline : pipelines) { try { pipeline.handle(event); @@ -283,6 +296,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC break; } } + long endTime = System.currentTimeMillis(); + logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took " + + (endTime - startTime) + " ms"); } // TODO since we read data in pipeline, we can get rid of reading from zookeeper in @@ -296,7 +312,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC // event.addAttribute("helixmanager", changeContext.getManager()); // event.addAttribute("changeContext", changeContext); // event.addAttribute("eventData", externalViewList); - // // handleEvent(event); + // _eventQueue.put(event); // logger.info("END: GenericClusterController.onExternalViewChange()"); } @@ -309,7 +325,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("instanceName", instanceName); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", statesInfo); - handleEvent(event); + _eventQueue.put(event); logger.info("END: GenericClusterController.onStateChange()"); } @@ -333,7 +349,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("instanceName", instanceName); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", messages); - handleEvent(event); + _eventQueue.put(event); if (_clusterStatusMonitor != null && messages != null) { _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size()); @@ -367,7 +383,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", liveInstances); - handleEvent(event); + _eventQueue.put(event); logger.info("END: Generic GenericClusterController.onLiveInstanceChange()"); } @@ -393,13 +409,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("changeContext", changeContext); event.addAttribute("eventData", idealStates); - handleEvent(event); + _eventQueue.put(event); if (changeContext.getType() != Type.FINALIZE) { checkRebalancingTimer(changeContext.getManager(), idealStates); } - logger.info("END: Generic GenericClusterController.onIdealStateChange()"); + logger.info("END: GenericClusterController.onIdealStateChange()"); } @Override @@ -409,7 +425,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("changeContext", changeContext); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("eventData", configs); - handleEvent(event); + _eventQueue.put(event); logger.info("END: GenericClusterController.onConfigChange()"); } @@ -452,7 +468,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC event.addAttribute("changeContext", changeContext); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("eventData", pauseSignal); - handleEvent(event); + _eventQueue.put(event); } else { _paused = false; } @@ -534,4 +550,25 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC } + private class ClusterEventProcessor extends Thread { + @Override + public void run() { + logger.info("START ClusterEventProcessor thread"); + while (!isInterrupted()) { + try { + ClusterEvent event = _eventQueue.take(); + handleEvent(event); + } catch (InterruptedException e) { + logger.warn("ClusterEventProcessor interrupted", e); + interrupt(); + } catch (ZkInterruptedException e) { + logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e); + interrupt(); + } catch (Throwable t) { + logger.error("ClusterEventProcessor failed while running the controller pipeline", t); + } + } + logger.info("END ClusterEventProcessor thread"); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java new file mode 100644 index 0000000..54c9ca2 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java @@ -0,0 +1,123 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; + +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents + * multiple events of the same type from flooding the controller and preventing progress from being + * made. This queue has no capacity. This class is meant to be a limited implementation of the + * {@link BlockingQueue} interface. + */ +public class ClusterEventBlockingQueue { + private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class); + private final Map<String, ClusterEvent> _eventMap; + private final Queue<ClusterEvent> _eventQueue; + + /** + * Instantiate the queue + */ + public ClusterEventBlockingQueue() { + _eventMap = Maps.newHashMap(); + _eventQueue = Lists.newLinkedList(); + } + + /** + * Remove all events from the queue + */ + public synchronized void clear() { + _eventMap.clear(); + _eventQueue.clear(); + } + + /** + * Add a single event to the queue, overwriting events with the same name + * @param event ClusterEvent event to add + */ + public synchronized void put(ClusterEvent event) { + if (!_eventMap.containsKey(event.getName())) { + // only insert if there isn't a same-named event already present + boolean result = _eventQueue.offer(event); + if (!result) { + return; + } + } + // always overwrite in case this is a FINALIZE + _eventMap.put(event.getName(), event); + LOG.debug("Putting event " + event.getName()); + LOG.debug("Event queue size: " + _eventQueue.size()); + notify(); + } + + /** + * Remove an element from the front of the queue, blocking if none is available. This method + * will return the most recent event seen with the oldest enqueued event name. + * @return ClusterEvent at the front of the queue + * @throws InterruptedException if the wait for elements was interrupted + */ + public synchronized ClusterEvent take() throws InterruptedException { + while (_eventQueue.isEmpty()) { + wait(); + } + ClusterEvent queuedEvent = _eventQueue.poll(); + if (queuedEvent != null) { + LOG.debug("Taking event " + queuedEvent.getName()); + LOG.debug("Event queue size: " + _eventQueue.size()); + return _eventMap.remove(queuedEvent.getName()); + } + return null; + } + + /** + * Get at the head of the queue without removing it + * @return ClusterEvent at the front of the queue, or null if none available + */ + public synchronized ClusterEvent peek() { + ClusterEvent queuedEvent = _eventQueue.peek(); + if (queuedEvent != null) { + return _eventMap.get(queuedEvent.getName()); + } + return queuedEvent; + } + + /** + * Get the queue size + * @return integer size of the queue + */ + public int size() { + return _eventQueue.size(); + } + + /** + * Check if the queue is empty + * @return true if events are not present, false otherwise + */ + public boolean isEmpty() { + return _eventQueue.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java new file mode 100644 index 0000000..2dba7b6 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java @@ -0,0 +1,95 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in + * FIFO order. + */ +public class TestClusterEventBlockingQueue { + @Test + public void testEventQueue() throws Exception { + // initialize the queue + ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue(); + + // add an event + ClusterEvent event1 = new ClusterEvent("event1"); + queue.put(event1); + Assert.assertEquals(queue.size(), 1); + + // add an event with a different name + ClusterEvent event2 = new ClusterEvent("event2"); + queue.put(event2); + Assert.assertEquals(queue.size(), 2); + + // add an event with the same name as event1 (should not change queue size) + ClusterEvent newEvent1 = new ClusterEvent("event1"); + newEvent1.addAttribute("attr", 1); + queue.put(newEvent1); + Assert.assertEquals(queue.size(), 2); + + // test peek + ClusterEvent peeked = queue.peek(); + Assert.assertEquals(peeked.getName(), "event1"); + Assert.assertEquals(peeked.getAttribute("attr"), 1); + Assert.assertEquals(queue.size(), 2); + + // test take the head + ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + ClusterEvent takenEvent1 = safeTake(queue, service); + Assert.assertEquals(takenEvent1.getName(), "event1"); + Assert.assertEquals(takenEvent1.getAttribute("attr"), 1); + Assert.assertEquals(queue.size(), 1); + + // test take the tail + ClusterEvent takenEvent2 = safeTake(queue, service); + Assert.assertEquals(takenEvent2.getName(), "event2"); + Assert.assertEquals(queue.size(), 0); + } + + private ClusterEvent safeTake(final ClusterEventBlockingQueue queue, + final ListeningExecutorService service) throws InterruptedException, ExecutionException, + TimeoutException { + // the take() in ClusterEventBlockingQueue will wait indefinitely + // for this test, stop waiting after 30 seconds + ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() { + @Override + public ClusterEvent call() throws InterruptedException { + return queue.take(); + } + }); + ClusterEvent event = future.get(30, TimeUnit.SECONDS); + return event; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java index b415393..318ab66 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java @@ -142,6 +142,7 @@ public class TestHelixConnection extends ZkUnitTestBase { StateModelDefId.from("MasterSlave"), new MockStateModelFactory()); participant.startAsync(); + Thread.sleep(1000); // verify final HelixDataAccessor accessor = connection.createDataAccessor(clusterId); http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 30f5807..6066859 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -38,10 +38,10 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; import org.apache.helix.messaging.AsyncCallback; import org.apache.helix.messaging.handling.HelixTaskResult; @@ -196,6 +196,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ public void testSchedulerMsgUsingQueue() throws Exception { Logger.getRootLogger().setLevel(Level.INFO); _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( @@ -345,6 +346,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ int messageResultCount = 0; for (int i = 0; i < 10; i++) { + Thread.sleep(1000); ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") .equals("" + (_PARTITIONS * 3))); @@ -422,6 +424,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ @Test() public void testSchedulerMsg2() throws Exception { _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( @@ -583,6 +586,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ @Test() public void testSchedulerMsg3() throws Exception { _factory._results.clear(); + Thread.sleep(2000); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { _participants[i].getMessagingService().registerMessageHandlerFactory( @@ -690,7 +694,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ } Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5); - boolean success = false; for (int j = 0; j < 6; j++) { int count = 0; @@ -763,7 +766,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ String crString = sw.toString(); schedulerMessage.getRecord().setSimpleField("Criteria", crString); - schedulerMessage.getRecord().setMapField("MessageTemplate", msgTemplate.getRecord().getSimpleFields()); + schedulerMessage.getRecord().setMapField("MessageTemplate", + msgTemplate.getRecord().getSimpleFields()); schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true"); @@ -1029,4 +1033,4 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ ConstraintType.MESSAGE_CONSTRAINT, "constraint1"); } -} \ No newline at end of file +}
