Repository: helix Updated Branches: refs/heads/master 4a99bc43c -> d22adbf97
[HELIX-709] Prepare controller stages for async execution Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d22adbf9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d22adbf9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d22adbf9 Branch: refs/heads/master Commit: d22adbf9760316118dd8e6eda5aba4219e399a60 Parents: 4a99bc4 Author: Harry Zhang <[email protected]> Authored: Thu Jun 28 14:25:21 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Thu Jun 28 15:27:29 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 4 + .../pipeline/AbstractAsyncBaseStage.java | 77 ++++++++++++++++++++ .../controller/pipeline/AbstractBaseStage.java | 12 +-- .../controller/pipeline/AsyncWorkerType.java | 32 ++++++++ .../helix/controller/pipeline/Pipeline.java | 4 + .../controller/stages/AsyncWorkerType.java | 32 -------- .../helix/controller/stages/AttributeName.java | 3 +- .../stages/PersistAssignmentStage.java | 28 ++----- .../stages/TargetExteralViewCalcStage.java | 46 +++--------- .../common/ZkIntegrationTestBase.java | 10 ++- 10 files changed, 147 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 0e0817d..7603975 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -41,6 +41,7 @@ import org.apache.helix.api.exceptions.HelixMetaDataAccessException; import org.apache.helix.api.listeners.*; import org.apache.helix.common.ClusterEventBlockingQueue; import org.apache.helix.common.DedupEventProcessor; +import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.*; @@ -198,6 +199,7 @@ public class GenericHelixController implements IdealStateChangeListener, event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.eventData.name(), new ArrayList<>()); + event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); _taskEventQueue.put(event); _eventQueue.put(event); @@ -421,6 +423,7 @@ public class GenericHelixController implements IdealStateChangeListener, long startTime = System.currentTimeMillis(); boolean rebalanceFail = false; for (Pipeline pipeline : pipelines) { + event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType()); try { pipeline.handle(event); pipeline.finish(); @@ -887,6 +890,7 @@ public class GenericHelixController implements IdealStateChangeListener, event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.eventData.name(), signal); + event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); _eventQueue.put(event); _taskEventQueue.put(event.clone()); } http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java new file mode 100644 index 0000000..f305665 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java @@ -0,0 +1,77 @@ +package org.apache.helix.controller.pipeline; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.common.DedupEventProcessor; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.ClusterEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractAsyncBaseStage extends AbstractBaseStage { + private static final Logger logger = LoggerFactory.getLogger(AbstractAsyncBaseStage.class); + + @Override + public void process(final ClusterEvent event) throws Exception { + String pipelineType = event.getAttribute(AttributeName.PipelineType.name()); + final String taskType = getAsyncTaskDedupType(pipelineType); + DedupEventProcessor<String, Runnable> worker = + getAsyncWorkerFromClusterEvent(event, getAsyncWorkerType()); + if (worker == null) { + throw new StageException("No async worker found for " + taskType); + } + + worker.queueEvent(taskType, new Runnable() { + @Override + public void run() { + long startTimestamp = System.currentTimeMillis(); + logger.info("START AsyncProcess: {}", taskType); + try { + execute(event); + } catch (Exception e) { + logger.error("Failed to process {} asynchronously", taskType, e); + } + long endTimestamp = System.currentTimeMillis(); + logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp); + } + }); + logger.info("Submitted asynchronous {} task to worker", taskType); + } + + /** + * Stage that implements AbstractAsyncBaseStage should implement this method + * to get it's worker + * @return AsyncWorkerType + */ + public abstract AsyncWorkerType getAsyncWorkerType(); + + /** + * Implements stages main logic + * + * @param event ClusterEvent + * @throws Exception exception + */ + public abstract void execute(final ClusterEvent event) throws Exception; + + private String getAsyncTaskDedupType(String pipelineType) { + return String + .format("%s::%s", pipelineType, getClass().getSimpleName()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java index d12833f..324ed02 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.apache.helix.common.DedupEventProcessor; -import org.apache.helix.controller.stages.AsyncWorkerType; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; @@ -68,19 +67,14 @@ public class AbstractBaseStage implements Stage { } protected DedupEventProcessor<String, Runnable> getAsyncWorkerFromClusterEvent(ClusterEvent event, - AsyncWorkerType worker) { + AsyncWorkerType workerType) { Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = event.getAttribute(AttributeName.AsyncFIFOWorkerPool.name()); if (workerPool != null) { - if (workerPool.containsKey(worker)) { - return workerPool.get(worker); + if (workerPool.containsKey(workerType)) { + return workerPool.get(workerType); } } return null; } - - protected String getAsyncTaskDedupType(boolean isTaskPipeline) { - return String - .format("%s::%s", isTaskPipeline ? "TASK" : "RESOURCE", getClass().getSimpleName()); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java new file mode 100644 index 0000000..62e324c --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java @@ -0,0 +1,32 @@ +package org.apache.helix.controller.pipeline; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that have + * the choice to submit its tasks to corresponding workers to do the job asynchronously. + * + * This class contains Async worker enums that corresponding stages can use + */ + +public enum AsyncWorkerType { + TargetExternalViewCalcWorker, + PersistAssignmentWorker +} http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java index ac483f4..2946129 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java @@ -48,6 +48,10 @@ public class Pipeline { stage.init(context); } + public String getPipelineType() { + return _pipelineType; + } + public void handle(ClusterEvent event) throws Exception { if (_stages == null) { return; http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java deleted file mode 100644 index 995705f..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that have - * the choice to submit its tasks to corresponding workers to do the job asynchronously. - * - * This class contains Async worker enums that corresponding stages can use - */ - -public enum AsyncWorkerType { - TargetExternalViewCalcWorker, - PersistAssignmentWorker -} http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java index b98dc9e..56bbb44 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java @@ -36,5 +36,6 @@ public enum AttributeName { changeContext, instanceName, eventData, - AsyncFIFOWorkerPool + AsyncFIFOWorkerPool, + PipelineType } http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index 7463f24..ca83445 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -32,7 +32,9 @@ import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.common.PartitionStateMap; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -45,32 +47,16 @@ import org.slf4j.LoggerFactory; /** * Persist the ResourceAssignment of each resource that went through rebalancing */ -public class PersistAssignmentStage extends AbstractBaseStage { +public class PersistAssignmentStage extends AbstractAsyncBaseStage { private static final Logger LOG = LoggerFactory.getLogger(PersistAssignmentStage.class); @Override - public void process(final ClusterEvent event) throws Exception { - DedupEventProcessor<String, Runnable> asyncWorker = - getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.PersistAssignmentWorker); - ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); - - if (asyncWorker != null) { - LOG.info("Sending PersistAssignmentStage task for cluster {}, {} pipeline to worker", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); - asyncWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() { - @Override - public void run() { - doPersistAssignment(event); - } - }); - } else { - LOG.info("Starting PersistAssignmentStage synchronously for cluster {}, {} pipeline", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); - doPersistAssignment(event); - } + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.PersistAssignmentWorker; } - private void doPersistAssignment(final ClusterEvent event) { + @Override + public void execute(final ClusterEvent event) throws Exception { ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); ClusterConfig clusterConfig = cache.getClusterConfig(); http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java index 95b3988..aa2a8e9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java @@ -27,9 +27,9 @@ import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; -import org.apache.helix.common.DedupEventProcessor; import org.apache.helix.controller.common.PartitionStateMap; -import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.Partition; @@ -39,42 +39,23 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -public class TargetExteralViewCalcStage extends AbstractBaseStage { +public class TargetExteralViewCalcStage extends AbstractAsyncBaseStage { private static final Logger LOG = LoggerFactory.getLogger(TargetExteralViewCalcStage.class); @Override - public void process(final ClusterEvent event) throws Exception { - ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); - DedupEventProcessor<String, Runnable> tevWorker = - getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.TargetExternalViewCalcWorker); + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.TargetExternalViewCalcWorker; + } + @Override + public void execute(final ClusterEvent event) throws Exception { + ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); ClusterConfig clusterConfig = cache.getClusterConfig(); - if (cache.isTaskCache() || !clusterConfig.isTargetExternalViewEnabled()) { return; } - if (tevWorker == null) { - LOG.info("Generating target external view synchronously for cluster {}, {} pipeline", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); - doCalculateTargetExternalView(event); - } else { - // We have an async worker so update external view asynchronously - LOG.info("Sending target external view generating task for cluster {}, {} pipeline to worker", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); - tevWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() { - @Override - public void run() { - doCalculateTargetExternalView(event); - } - }); - } - - } - - private void doCalculateTargetExternalView(final ClusterEvent event) { HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); - ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); BestPossibleStateOutput bestPossibleAssignments = @@ -84,10 +65,6 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); - long startTimeStamp = System.currentTimeMillis(); - LOG.info("START: computing target external view for cluster {}, {} pipeline", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE"); - if (!accessor.getBaseDataAccessor() .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT)) { accessor.getBaseDataAccessor() @@ -143,11 +120,6 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage { } // TODO (HELIX-964): remove TEV when idealstate is removed accessor.setChildren(keys, targetExternalViews); - - long endTimeStamp = System.currentTimeMillis(); - LOG.info("END: computing target external view for cluster {}, {} pipeline. Took: {} ms", - cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE", - endTimeStamp - startTimeStamp); } private Map<String, Map<String, String>> convertToMapFields( http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java index f1c59e1..2dca16b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; @@ -304,7 +305,14 @@ public class ZkIntegrationTestBase { StageContext context = new StageContext(); stage.init(context); stage.preProcess(); - stage.process(event); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } stage.postProcess(); } }
