Repository: helix Updated Branches: refs/heads/master 6e047915d -> 7a2b9693d
[HELIX-706] process tev and persist assignment asynchronously Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7a2b9693 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7a2b9693 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7a2b9693 Branch: refs/heads/master Commit: 7a2b9693d49d578ac9121a944f1b469c2f2316d9 Parents: 6e04791 Author: Harry Zhang <[email protected]> Authored: Tue Jun 26 16:05:50 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Jun 28 14:18:24 2018 -0700 ---------------------------------------------------------------------- .../helix/common/DedupEventProcessor.java | 3 + .../controller/GenericHelixController.java | 64 +++++++++++++++++--- .../controller/pipeline/AbstractBaseStage.java | 23 ++++++- .../controller/stages/AsyncWorkerType.java | 32 ++++++++++ .../helix/controller/stages/AttributeName.java | 3 +- .../controller/stages/ClusterDataCache.java | 2 +- .../stages/PersistAssignmentStage.java | 26 +++++++- .../stages/TargetExteralViewCalcStage.java | 53 +++++++++++++--- 8 files changed, 182 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java index 7f3525b..942b021 100644 --- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java +++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java @@ -58,6 +58,9 @@ public abstract class DedupEventProcessor<T, E> extends Thread { protected abstract void handleEvent(E event); public void queueEvent(T eventType, E event) { + if (isInterrupted()) { + return; + } _eventQueue.put(eventType, event); } http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 6e2df23..0e0817d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -19,8 +19,18 @@ package org.apache.helix.controller; * under the License. */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -30,23 +40,26 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.api.exceptions.HelixMetaDataAccessException; import org.apache.helix.api.listeners.*; import org.apache.helix.common.ClusterEventBlockingQueue; +import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.*; -import org.apache.helix.model.*; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.MaintenanceSignal; +import org.apache.helix.model.Message; +import org.apache.helix.model.PauseSignal; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.monitoring.mbeans.ClusterEventMonitor; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.TaskDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.helix.HelixConstants.ChangeType; +import static org.apache.helix.HelixConstants.*; /** * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State. @@ -90,6 +103,8 @@ public class GenericHelixController implements IdealStateChangeListener, private final ClusterEventBlockingQueue _taskEventQueue; private final ClusterEventProcessor _taskEventThread; + private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool; + private long _continousRebalanceFailureCount = 0; /** @@ -290,6 +305,9 @@ public class GenericHelixController implements IdealStateChangeListener, _eventQueue = new ClusterEventBlockingQueue(); _taskEventQueue = new ClusterEventBlockingQueue(); + + _asyncFIFOWorkerPool = new HashMap<>(); + _cache = new ClusterDataCache(clusterName); _taskCache = new ClusterDataCache(clusterName); @@ -298,12 +316,36 @@ public class GenericHelixController implements IdealStateChangeListener, _forceRebalanceTimer = new Timer(); + initializeAsyncFIFOWorkers(); initPipelines(_eventThread, _cache, false); initPipelines(_taskEventThread, _taskCache, true); _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName); } + private void initializeAsyncFIFOWorkers() { + for (AsyncWorkerType type : AsyncWorkerType.values()) { + DedupEventProcessor<String, Runnable> worker = + new DedupEventProcessor<String, Runnable>(_clusterName, type.name()) { + @Override + protected void handleEvent(Runnable event) { + // TODO: retry when queue is empty and event.run() failed? + event.run(); + } + }; + worker.start(); + _asyncFIFOWorkerPool.put(type, worker); + logger.info("Started async worker {}", worker.getName()); + } + } + + private void shutdownAsyncFIFOWorkers() { + for (DedupEventProcessor processor : _asyncFIFOWorkerPool.values()) { + processor.shutdown(); + logger.info("Shutdown async worker {}", processor.getName()); + } + } + private boolean isEventQueueEmpty(boolean taskQueue) { if (taskQueue) { return _taskEventQueue.isEmpty(); @@ -642,6 +684,7 @@ public class GenericHelixController implements IdealStateChangeListener, ClusterEvent event = new ClusterEvent(_clusterName, eventType); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); + event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) { event.addAttribute(attr.getKey(), attr.getValue()); } @@ -780,6 +823,9 @@ public class GenericHelixController implements IdealStateChangeListener, logger.warn("Timeout when terminating async tasks. Some async tasks are still executing."); } + // shutdown async workers + shutdownAsyncFIFOWorkers(); + enableClusterStatusMonitor(false); // TODO controller shouldn't be used in anyway after shutdown. http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index df37010..d12833f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -18,9 +18,13 @@ package org.apache.helix.controller.pipeline; * specific language governing permissions and limitations * under the License. */ + +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; - +import org.apache.helix.common.DedupEventProcessor; +import org.apache.helix.controller.stages.AsyncWorkerType; +import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; public class AbstractBaseStage implements Stage { @@ -62,4 +66,21 @@ public class AbstractBaseStage implements Stage { service.submit(task); } } + + protected DedupEventProcessor<String, Runnable> getAsyncWorkerFromClusterEvent(ClusterEvent event, + AsyncWorkerType worker) { + Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = + event.getAttribute(AttributeName.AsyncFIFOWorkerPool.name()); + if (workerPool != null) { + if (workerPool.containsKey(worker)) { + return workerPool.get(worker); + } + } + return null; + } + + protected String getAsyncTaskDedupType(boolean isTaskPipeline) { + return String + .format("%s::%s", isTaskPipeline ? "TASK" : "RESOURCE", getClass().getSimpleName()); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java new file mode 100644 index 0000000..995705f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java @@ -0,0 +1,32 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that have + * the choice to submit its tasks to corresponding workers to do the job asynchronously. + * + * This class contains Async worker enums that corresponding stages can use + */ + +public enum AsyncWorkerType { + TargetExternalViewCalcWorker, + PersistAssignmentWorker +} http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java index f7a6da2..b98dc9e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java @@ -35,5 +35,6 @@ public enum AttributeName { clusterStatusMonitor, changeContext, instanceName, - eventData + eventData, + AsyncFIFOWorkerPool } http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 5f87317..16b8633 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -154,7 +154,7 @@ public class ClusterDataCache { _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true); _updateInstanceOfflineTime = true; LOG.info("Refresh LiveInstances for cluster " + _clusterName + ", took " + ( - System.currentTimeMillis() - startTime) + " ms"); + System.currentTimeMillis() - start) + " ms"); } if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) { http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index 8d188ec..7463f24 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -24,13 +24,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; - import java.util.Set; import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; +import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -48,7 +48,29 @@ import org.slf4j.LoggerFactory; public class PersistAssignmentStage extends AbstractBaseStage { private static final Logger LOG = LoggerFactory.getLogger(PersistAssignmentStage.class); - @Override public void process(ClusterEvent event) throws Exception { + @Override + public void process(final ClusterEvent event) throws Exception { + DedupEventProcessor<String, Runnable> asyncWorker = + getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.PersistAssignmentWorker); + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + + if (asyncWorker != null) { + LOG.info("Sending PersistAssignmentStage task for cluster {}, {} pipeline to worker", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); + asyncWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() { + @Override + public void run() { + doPersistAssignment(event); + } + }); + } else { + LOG.info("Starting PersistAssignmentStage synchronously for cluster {}, {} pipeline", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); + doPersistAssignment(event); + } + } + + private void doPersistAssignment(final ClusterEvent event) { ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); ClusterConfig clusterConfig = cache.getClusterConfig(); http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java index b7e4ebd..95b3988 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java @@ -23,12 +23,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; +import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.model.ClusterConfig; @@ -44,24 +43,40 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { private static final Logger LOG = LoggerFactory.getLogger(TargetExteralViewCalcStage.class); @Override - public void process(ClusterEvent event) throws Exception { + public void process(final ClusterEvent event) throws Exception { ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); + DedupEventProcessor<String, Runnable> tevWorker = + getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.TargetExternalViewCalcWorker); + ClusterConfig clusterConfig = cache.getClusterConfig(); if (cache.isTaskCache() || !clusterConfig.isTargetExternalViewEnabled()) { return; } + if (tevWorker == null) { + LOG.info("Generating target external view synchronously for cluster {}, {} pipeline", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); + doCalculateTargetExternalView(event); + } else { + // We have an async worker so update external view asynchronously + LOG.info("Sending target external view generating task for cluster {}, {} pipeline to worker", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); + tevWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() { + @Override + public void run() { + doCalculateTargetExternalView(event); + } + }); + } + + } + + private void doCalculateTargetExternalView(final ClusterEvent event) { HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); - if (!accessor.getBaseDataAccessor() - .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT)) { - accessor.getBaseDataAccessor() - .create(accessor.keyBuilder().targetExternalViews().getPath(), null, - AccessOption.PERSISTENT); - } - BestPossibleStateOutput bestPossibleAssignments = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); @@ -69,6 +84,17 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); + long startTimeStamp = System.currentTimeMillis(); + LOG.info("START: computing target external view for cluster {}, {} pipeline", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); + + if (!accessor.getBaseDataAccessor() + .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT)) { + accessor.getBaseDataAccessor() + .create(accessor.keyBuilder().targetExternalViews().getPath(), null, + AccessOption.PERSISTENT); + } + List<PropertyKey> keys = new ArrayList<>(); List<ExternalView> targetExternalViews = new ArrayList<>(); @@ -115,7 +141,13 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { } } } + // TODO (HELIX-964): remove TEV when idealstate is removed accessor.setChildren(keys, targetExternalViews); + + long endTimeStamp = System.currentTimeMillis(); + LOG.info("END: computing target external view for cluster {}, {} pipeline. Took: {} ms", + cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE", + endTimeStamp - startTimeStamp); } private Map<String, Map<String, String>> convertToMapFields( @@ -126,4 +158,5 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { } return mapFields; } + }
