This is an automated email from the ASF dual-hosted git repository. saadurrahman pushed a commit to branch saadurrahman/3829-Deprecate-Apache-Aurora-dev in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit bfd151cf14034660ef7e5d767984d24f4f4a93a1 Author: Saad Ur Rahman <[email protected]> AuthorDate: Mon May 9 17:18:23 2022 -0400 [Sched] Removed Aurora from schedulers. Apache Aurora source and tests removed from scheduler. --- heron/schedulers/src/java/BUILD | 21 -- .../scheduler/aurora/AuroraCLIController.java | 214 ------------ .../heron/scheduler/aurora/AuroraContext.java | 53 --- .../heron/scheduler/aurora/AuroraController.java | 44 --- .../apache/heron/scheduler/aurora/AuroraField.java | 39 --- .../aurora/AuroraHeronShellController.java | 132 -------- .../heron/scheduler/aurora/AuroraLauncher.java | 58 ---- .../heron/scheduler/aurora/AuroraScheduler.java | 299 ----------------- heron/schedulers/tests/java/BUILD | 19 -- .../scheduler/aurora/AuroraCLIControllerTest.java | 200 ----------- .../heron/scheduler/aurora/AuroraContextTest.java | 49 --- .../heron/scheduler/aurora/AuroraLauncherTest.java | 80 ----- .../scheduler/aurora/AuroraSchedulerTest.java | 372 --------------------- 13 files changed, 1580 deletions(-) diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD index b223c7bc440..caa66719e13 100644 --- a/heron/schedulers/src/java/BUILD +++ b/heron/schedulers/src/java/BUILD @@ -93,27 +93,6 @@ genrule( cmd = "cp $< $@", ) -java_library( - name = "aurora-scheduler-java", - srcs = glob(["**/aurora/*.java"]), - resources = glob(["**/aurora/*.aurora"]), - deps = scheduler_deps_files, -) - -java_binary( - name = "aurora-scheduler-unshaded", - srcs = glob(["**/aurora/*.java"]), - resources = glob(["**/aurora/*.aurora"]), - deps = scheduler_deps_files, -) - -genrule( - name = "heron-aurora-scheduler", - srcs = [":aurora-scheduler-unshaded_deploy.jar"], - outs = ["heron-aurora-scheduler.jar"], - cmd = "cp $< $@", -) - java_library( name = "null-scheduler-java", srcs = glob( diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java deleted file mode 100644 index 5cd710a7bc0..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraCLIController.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.utils.ShellUtils; - -/** - * Implementation of AuroraController that shells out to the Aurora CLI to control the Aurora - * scheduler workflow of a topology. - */ -class AuroraCLIController implements AuroraController { - private static final Logger LOG = Logger.getLogger(AuroraCLIController.class.getName()); - - private final String jobSpec; - private final boolean isVerbose; - private String auroraFilename; - - AuroraCLIController( - String jobName, - String cluster, - String role, - String env, - String auroraFilename, - boolean isVerbose) { - this.auroraFilename = auroraFilename; - this.isVerbose = isVerbose; - this.jobSpec = String.format("%s/%s/%s/%s", cluster, role, env, jobName); - } - - @Override - public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) { - List<String> auroraCmd = - new ArrayList<>(Arrays.asList("aurora", "job", "create", "--wait-until", "RUNNING")); - - for (AuroraField field : bindings.keySet()) { - auroraCmd.add("--bind"); - auroraCmd.add(String.format("%s=%s", field, bindings.get(field))); - } - - if (!extra.isEmpty()) { - for (String field : extra.keySet()) { - if (field.equals(AuroraContext.JOB_TEMPLATE)) { - auroraFilename = auroraFilename.replace("heron.aurora", extra.get(field)); - } else { - auroraCmd.add("--bind"); - auroraCmd.add(String.format("%s=%s", field, extra.get(field))); - } - } - } - - auroraCmd.add(jobSpec); - auroraCmd.add(auroraFilename); - - if (isVerbose) { - auroraCmd.add("--verbose"); - } - - return runProcess(auroraCmd); - } - - // Kill an aurora job - @Override - public boolean killJob() { - List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "killall")); - auroraCmd.add(jobSpec); - - appendAuroraCommandOptions(auroraCmd, isVerbose); - - return runProcess(auroraCmd); - } - - // Restart an aurora job - @Override - public boolean restart(Integer containerId) { - List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "restart")); - if (containerId != null) { - auroraCmd.add(String.format("%s/%d", jobSpec, containerId)); - } else { - auroraCmd.add(jobSpec); - } - - appendAuroraCommandOptions(auroraCmd, isVerbose); - - return runProcess(auroraCmd); - } - - @Override - public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) { - String instancesToKill = getInstancesIdsToKill(containersToRemove); - //aurora job kill <cluster>/<role>/<env>/<name>/<instance_ids> - List<String> auroraCmd = new ArrayList<>(Arrays.asList( - "aurora", "job", "kill", jobSpec + "/" + instancesToKill)); - - appendAuroraCommandOptions(auroraCmd, isVerbose); - LOG.info(String.format( - "Killing %s aurora containers: %s", containersToRemove.size(), auroraCmd)); - if (!runProcess(auroraCmd)) { - throw new RuntimeException("Failed to kill freed aurora instances: " + instancesToKill); - } - } - - private static final String ERR_PROMPT = - "The topology can be in a strange stage. Please check carefully or redeploy the topology !!"; - - @Override - public Set<Integer> addContainers(Integer count) { - //aurora job add <cluster>/<role>/<env>/<name>/<instance_id> <count> - //clone instance 0 - List<String> auroraCmd = new ArrayList<>(Arrays.asList( - "aurora", "job", "add", "--wait-until", "RUNNING", - jobSpec + "/0", count.toString(), "--verbose")); - - LOG.info(String.format("Requesting %s new aurora containers %s", count, auroraCmd)); - StringBuilder stderr = new StringBuilder(); - if (!runProcess(auroraCmd, null, stderr)) { - throw new RuntimeException( - "Failed to create " + count + " new aurora instances. " + ERR_PROMPT); - } - - if (stderr.length() <= 0) { // no container was added - throw new RuntimeException("Empty output by Aurora. " + ERR_PROMPT); - } - return extractContainerIds(stderr.toString()); - } - - private Set<Integer> extractContainerIds(String auroraOutputStr) { - String pattern = "Querying instance statuses: ["; - int idx1 = auroraOutputStr.indexOf(pattern); - if (idx1 < 0) { // no container was added - LOG.info("stdout & stderr by Aurora " + auroraOutputStr); - return new HashSet<Integer>(); - } - idx1 += pattern.length(); - int idx2 = auroraOutputStr.indexOf("]", idx1); - String containerIdStr = auroraOutputStr.substring(idx1, idx2); - LOG.info("container IDs returned by Aurora " + containerIdStr); - return Arrays.asList(containerIdStr.split(", ")) - .stream().map(x->Integer.valueOf(x)).collect(Collectors.toSet()); - } - - // Utils method for unit tests - @VisibleForTesting - boolean runProcess(List<String> auroraCmd, StringBuilder stdout, StringBuilder stderr) { - int status = - ShellUtils.runProcess(auroraCmd.toArray(new String[auroraCmd.size()]), - stderr != null ? stderr : new StringBuilder()); - - if (status != 0) { - LOG.severe(String.format( - "Failed to run process. Command=%s, STDOUT=%s, STDERR=%s", auroraCmd, stdout, stderr)); - } - return status == 0; - } - - // Utils method for unit tests - @VisibleForTesting - boolean runProcess(List<String> auroraCmd) { - return runProcess(auroraCmd, null, null); - } - - private static String getInstancesIdsToKill(Set<PackingPlan.ContainerPlan> containersToRemove) { - StringBuilder ids = new StringBuilder(); - for (PackingPlan.ContainerPlan containerPlan : containersToRemove) { - if (ids.length() > 0) { - ids.append(","); - } - ids.append(containerPlan.getId()); - } - return ids.toString(); - } - - // Static method to append verbose and batching options if needed - private static void appendAuroraCommandOptions(List<String> auroraCmd, boolean isVerbose) { - // Append verbose if needed - if (isVerbose) { - auroraCmd.add("--verbose"); - } - - // Append batch size. - // Note that we can not use "--no-batching" since "restart" command does not accept it. - // So we play a small trick here by setting batch size Integer.MAX_VALUE. - auroraCmd.add("--batch-size"); - auroraCmd.add(Integer.toString(Integer.MAX_VALUE)); - } -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java deleted file mode 100644 index d48cf8d7580..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraContext.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.io.File; - -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.common.Context; - -public final class AuroraContext extends Context { - public static final String JOB_LINK_TEMPLATE = "heron.scheduler.job.link.template"; - public static final String JOB_TEMPLATE = "heron.scheduler.job.template"; - public static final String JOB_MAX_KILL_ATTEMPTS = "heron.scheduler.job.max.kill.attempts"; - public static final String JOB_KILL_RETRY_INTERVAL_MS = - "heron.scheduler.job.kill.retry.interval.ms"; - - private AuroraContext() { - } - - public static String getJobLinkTemplate(Config config) { - return config.getStringValue(JOB_LINK_TEMPLATE); - } - - public static String getHeronAuroraPath(Config config) { - return config.getStringValue(JOB_TEMPLATE, - new File(Context.heronConf(config), "heron.aurora").getPath()); - } - - public static int getJobMaxKillAttempts(Config config) { - return config.getIntegerValue(JOB_MAX_KILL_ATTEMPTS, 5); - } - - public static long getJobKillRetryIntervalMs(Config config) { - return config.getLongValue(JOB_KILL_RETRY_INTERVAL_MS, 2000); - } -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java deleted file mode 100644 index a70333908a0..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraController.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.util.Map; -import java.util.Set; - -import org.apache.heron.spi.packing.PackingPlan; - -/** - * Interface that defines how a client interacts with aurora to control the job lifecycle - */ -public interface AuroraController { - - boolean createJob(Map<AuroraField, String> auroraProperties, Map<String, String> extra); - boolean killJob(); - - /** - * Restarts a given container, or the entire job if containerId is null - * @param containerId ID of container to restart, or entire job if null - * @return the boolean return value - */ - boolean restart(Integer containerId); - - void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove); - Set<Integer> addContainers(Integer count); -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java deleted file mode 100644 index cc04ec10c98..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraField.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -/** - * Field names passed to aurora controllers during job creation - */ -public enum AuroraField { - CLUSTER, - CORE_PACKAGE_URI, - CPUS_PER_CONTAINER, - DISK_PER_CONTAINER, - ENVIRON, - EXECUTOR_BINARY, - NUM_CONTAINERS, - RAM_PER_CONTAINER, - ROLE, - TIER, - TOPOLOGY_ARGUMENTS, - TOPOLOGY_NAME, - TOPOLOGY_PACKAGE_URI -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java deleted file mode 100644 index aa6c708ca7a..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraHeronShellController.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.net.HttpURLConnection; -import java.util.Map; -import java.util.Set; -import java.util.logging.Logger; - -import org.apache.heron.proto.system.PhysicalPlans.StMgr; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.common.Context; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.statemgr.IStateManager; -import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor; -import org.apache.heron.spi.utils.NetworkUtils; -import org.apache.heron.spi.utils.ReflectionUtils; - -/** - * Implementation of AuroraController that is a wrapper of AuroraCLIController. - * The difference is `restart` command: - * 1. restart whole topology: delegate to AuroraCLIController - * 2. restart container 0: delegate to AuroraCLIController - * 3. restart container x(x>0): call heron-shell endpoint `/killexecutor` - * For backpressure, only containers with heron-stmgr may send out backpressure. - * This class is to handle `restart backpressure containers inside container`, - * while delegating to AuroraCLIController for all the other scenarios. - */ -class AuroraHeronShellController implements AuroraController { - private static final Logger LOG = Logger.getLogger(AuroraHeronShellController.class.getName()); - - private final String topologyName; - private final AuroraCLIController cliController; - private final SchedulerStateManagerAdaptor stateMgrAdaptor; - - AuroraHeronShellController(String jobName, String cluster, String role, String env, - String auroraFilename, boolean isVerbose, Config localConfig) - throws ClassNotFoundException, InstantiationException, IllegalAccessException { - this.topologyName = jobName; - this.cliController = - new AuroraCLIController(jobName, cluster, role, env, auroraFilename, isVerbose); - - Config config = Config.toClusterMode(localConfig); - String stateMgrClass = Context.stateManagerClass(config); - IStateManager stateMgr = ReflectionUtils.newInstance(stateMgrClass); - stateMgr.initialize(config); - stateMgrAdaptor = new SchedulerStateManagerAdaptor(stateMgr, 5000); - } - - @Override - public boolean createJob(Map<AuroraField, String> bindings, Map<String, String> extra) { - return cliController.createJob(bindings, extra); - } - - @Override - public boolean killJob() { - return cliController.killJob(); - } - - private StMgr searchContainer(Integer id) { - String prefix = "stmgr-" + id; - for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) { - if (sm.getId().equals(prefix)) { - return sm; - } - } - return null; - } - - // Restart an aurora container - @Override - public boolean restart(Integer containerId) { - // there is no backpressure for container 0, delegate to aurora client - if (containerId == null || containerId == 0) { - return cliController.restart(containerId); - } - - if (stateMgrAdaptor == null) { - LOG.warning("SchedulerStateManagerAdaptor not initialized"); - return false; - } - - StMgr sm = searchContainer(containerId); - if (sm == null) { - LOG.warning("container not found in pplan " + containerId); - return false; - } - - String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor"; - String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId(); - LOG.info("sending `kill container` to " + url + "; payload: " + payload); - - HttpURLConnection con = NetworkUtils.getHttpConnection(url); - try { - if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) { - return NetworkUtils.checkHttpResponseCode(con, 200); - } else { // if heron-shell command fails, delegate to aurora client - LOG.info("heron-shell killexecutor failed; try aurora client .."); - return cliController.restart(containerId); - } - } finally { - con.disconnect(); - } - } - - @Override - public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) { - cliController.removeContainers(containersToRemove); - } - - @Override - public Set<Integer> addContainers(Integer count) { - return cliController.addContainers(count); - } -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java deleted file mode 100644 index 023fcd98626..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraLauncher.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import org.apache.heron.scheduler.utils.LauncherUtils; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.scheduler.ILauncher; -import org.apache.heron.spi.scheduler.IScheduler; - -/** - * Launch topology locally to Aurora. - */ - -public class AuroraLauncher implements ILauncher { - private Config config; - private Config runtime; - - @Override - public void initialize(Config mConfig, Config mRuntime) { - this.config = mConfig; - this.runtime = mRuntime; - } - - @Override - public void close() { - - } - - @Override - public boolean launch(PackingPlan packing) { - LauncherUtils launcherUtils = LauncherUtils.getInstance(); - Config ytruntime = launcherUtils.createConfigWithPackingDetails(runtime, packing); - return launcherUtils.onScheduleAsLibrary(config, ytruntime, getScheduler(), packing); - } - - // Get AuroraScheduler - protected IScheduler getScheduler() { - return new AuroraScheduler(); - } -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java deleted file mode 100644 index 9c386f22a95..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/aurora/AuroraScheduler.java +++ /dev/null @@ -1,299 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Scanner; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Optional; - -import org.apache.heron.api.generated.TopologyAPI; -import org.apache.heron.api.utils.TopologyUtils; -import org.apache.heron.api.utils.Utils; -import org.apache.heron.proto.scheduler.Scheduler; -import org.apache.heron.scheduler.UpdateTopologyManager; -import org.apache.heron.scheduler.utils.Runtime; -import org.apache.heron.scheduler.utils.SchedulerUtils; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.common.Context; -import org.apache.heron.spi.common.Key; -import org.apache.heron.spi.common.TokenSub; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.packing.Resource; -import org.apache.heron.spi.scheduler.IScalable; -import org.apache.heron.spi.scheduler.IScheduler; - -public class AuroraScheduler implements IScheduler, IScalable { - private static final Logger LOG = Logger.getLogger(AuroraLauncher.class.getName()); - - private Config config; - private Config runtime; - private AuroraController controller; - private UpdateTopologyManager updateTopologyManager; - - @Override - public void initialize(Config mConfig, Config mRuntime) { - this.config = Config.toClusterMode(mConfig); - this.runtime = mRuntime; - try { - this.controller = getController(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - LOG.severe("AuroraController initialization failed " + e.getMessage()); - } - this.updateTopologyManager = - new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this)); - } - - /** - * Get an AuroraController based on the config and runtime - * - * @return AuroraController - */ - protected AuroraController getController() - throws ClassNotFoundException, InstantiationException, IllegalAccessException { - Boolean cliController = config.getBooleanValue(Key.AURORA_CONTROLLER_CLASS); - Config localConfig = Config.toLocalMode(this.config); - if (cliController) { - return new AuroraCLIController( - Runtime.topologyName(runtime), - Context.cluster(localConfig), - Context.role(localConfig), - Context.environ(localConfig), - AuroraContext.getHeronAuroraPath(localConfig), - Context.verbose(localConfig)); - } else { - return new AuroraHeronShellController( - Runtime.topologyName(runtime), - Context.cluster(localConfig), - Context.role(localConfig), - Context.environ(localConfig), - AuroraContext.getHeronAuroraPath(localConfig), - Context.verbose(localConfig), - localConfig); - } - } - - @Override - public void close() { - if (updateTopologyManager != null) { - updateTopologyManager.close(); - } - } - - @Override - public boolean onSchedule(PackingPlan packing) { - if (packing == null || packing.getContainers().isEmpty()) { - LOG.severe("No container requested. Can't schedule"); - return false; - } - - LOG.info("Launching topology in aurora"); - - // Align the cpu, RAM, disk to the maximal one, and set them to ScheduledResource - PackingPlan updatedPackingPlan = packing.cloneWithHomogeneousScheduledResource(); - SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtime), updatedPackingPlan, - Runtime.schedulerStateManagerAdaptor(runtime)); - - // Use the ScheduledResource to create aurora properties - // the ScheduledResource is guaranteed to be set after calling - // cloneWithHomogeneousScheduledResource in the above code - Resource containerResource = - updatedPackingPlan.getContainers().iterator().next().getScheduledResource().get(); - Map<AuroraField, String> auroraProperties = createAuroraProperties(containerResource); - Map<String, String> extraProperties = createExtraProperties(containerResource); - - return controller.createJob(auroraProperties, extraProperties); - } - - @Override - public List<String> getJobLinks() { - List<String> jobLinks = new ArrayList<>(); - - //Only the aurora job page is returned - String jobLinkFormat = AuroraContext.getJobLinkTemplate(config); - if (jobLinkFormat != null && !jobLinkFormat.isEmpty()) { - String jobLink = TokenSub.substitute(config, jobLinkFormat); - jobLinks.add(jobLink); - } - - return jobLinks; - } - - @Override - public boolean onKill(Scheduler.KillTopologyRequest request) { - // The aurora service can be unavailable or unstable for a while, - // we will try to kill the job with multiple attempts - int attempts = AuroraContext.getJobMaxKillAttempts(config); - long retryIntervalMs = AuroraContext.getJobKillRetryIntervalMs(config); - LOG.info("Will try " + attempts + " attempts at interval: " + retryIntervalMs + " ms"); - - // First attempt - boolean res = controller.killJob(); - attempts--; - - // Failure retry - while (!res && attempts > 0) { - LOG.warning("Failed to kill the topology. Will retry in " + retryIntervalMs + " ms..."); - Utils.sleep(retryIntervalMs); - - // Retry the killJob() - res = controller.killJob(); - attempts--; - } - - return res; - } - - @Override - public boolean onRestart(Scheduler.RestartTopologyRequest request) { - Integer containerId = null; - if (request.getContainerIndex() != -1) { - containerId = request.getContainerIndex(); - } - return controller.restart(containerId); - } - - @Override - public boolean onUpdate(Scheduler.UpdateTopologyRequest request) { - try { - updateTopologyManager.updateTopology( - request.getCurrentPackingPlan(), request.getProposedPackingPlan()); - } catch (ExecutionException | InterruptedException e) { - LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e); - return false; - } - return true; - } - - private static final String CONFIRMED_YES = "y"; - boolean hasConfirmedWithUser(int newContainerCount) { - LOG.info(String.format("After update there will be %d more containers. " - + "Please make sure there are sufficient resources to update this job. " - + "Continue update? [y/N]: ", newContainerCount)); - Scanner scanner = new Scanner(System.in); - String userInput = scanner.nextLine(); - return CONFIRMED_YES.equalsIgnoreCase(userInput); - } - - @Override - public Set<PackingPlan.ContainerPlan> addContainers( - Set<PackingPlan.ContainerPlan> containersToAdd) { - Set<PackingPlan.ContainerPlan> remapping = new HashSet<>(); - if ("prompt".equalsIgnoreCase(Context.updatePrompt(config)) - && !hasConfirmedWithUser(containersToAdd.size())) { - LOG.warning("Scheduler updated topology canceled."); - return remapping; - } - - // Do the actual containers adding - LinkedList<Integer> newAddedContainerIds = new LinkedList<>( - controller.addContainers(containersToAdd.size())); - if (newAddedContainerIds.size() != containersToAdd.size()) { - throw new RuntimeException( - "Aurora returned different container count " + newAddedContainerIds.size() - + "; input count was " + containersToAdd.size()); - } - // Do the remapping: - // use the `newAddedContainerIds` to replace the container id in the `containersToAdd` - for (PackingPlan.ContainerPlan cp : containersToAdd) { - PackingPlan.ContainerPlan newContainerPlan = - new PackingPlan.ContainerPlan( - newAddedContainerIds.pop(), cp.getInstances(), - cp.getRequiredResource(), cp.getScheduledResource().orNull()); - remapping.add(newContainerPlan); - } - LOG.info("The remapping structure: " + remapping); - return remapping; - } - - @Override - public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) { - controller.removeContainers(containersToRemove); - } - - protected Map<AuroraField, String> createAuroraProperties(Resource containerResource) { - Map<AuroraField, String> auroraProperties = new HashMap<>(); - - TopologyAPI.Topology topology = Runtime.topology(runtime); - - auroraProperties.put(AuroraField.EXECUTOR_BINARY, - Context.executorBinary(config)); - - List<String> topologyArgs = new ArrayList<>(); - SchedulerUtils.addExecutorTopologyArgs(topologyArgs, config, runtime); - String args = String.join(" ", topologyArgs); - auroraProperties.put(AuroraField.TOPOLOGY_ARGUMENTS, args); - - auroraProperties.put(AuroraField.CLUSTER, Context.cluster(config)); - auroraProperties.put(AuroraField.ENVIRON, Context.environ(config)); - auroraProperties.put(AuroraField.ROLE, Context.role(config)); - auroraProperties.put(AuroraField.TOPOLOGY_NAME, topology.getName()); - - auroraProperties.put(AuroraField.CPUS_PER_CONTAINER, - Double.toString(containerResource.getCpu())); - auroraProperties.put(AuroraField.DISK_PER_CONTAINER, - Long.toString(containerResource.getDisk().asBytes())); - auroraProperties.put(AuroraField.RAM_PER_CONTAINER, - Long.toString(containerResource.getRam().asBytes())); - - auroraProperties.put(AuroraField.NUM_CONTAINERS, - Integer.toString(1 + TopologyUtils.getNumContainers(topology))); - - // Job configuration attribute 'production' is deprecated. - // Use 'tier' attribute instead - // See: http://aurora.apache.org/documentation/latest/reference/configuration/#job-objects - if ("prod".equals(Context.environ(config))) { - auroraProperties.put(AuroraField.TIER, "preferred"); - } else { - auroraProperties.put(AuroraField.TIER, "preemptible"); - } - - String heronCoreReleasePkgURI = Context.corePackageUri(config); - String topologyPkgURI = Runtime.topologyPackageUri(runtime).toString(); - - auroraProperties.put(AuroraField.CORE_PACKAGE_URI, heronCoreReleasePkgURI); - auroraProperties.put(AuroraField.TOPOLOGY_PACKAGE_URI, topologyPkgURI); - - return auroraProperties; - } - - protected Map<String, String> createExtraProperties(Resource containerResource) { - Map<String, String> extraProperties = new HashMap<>(); - - if (config.containsKey(Key.SCHEDULER_PROPERTIES)) { - String[] meta = config.getStringValue(Key.SCHEDULER_PROPERTIES).split(","); - extraProperties.put(AuroraContext.JOB_TEMPLATE, meta[0]); - for (int idx = 1; idx < meta.length; idx++) { - extraProperties.put("AURORA_METADATA_" + idx, meta[idx]); - } - } - - return extraProperties; - } -} diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD index 3ad9ffedf36..f3122052f0c 100644 --- a/heron/schedulers/tests/java/BUILD +++ b/heron/schedulers/tests/java/BUILD @@ -25,9 +25,6 @@ scheduler_deps_files = \ common_deps_files + \ spi_deps_files -aurora_deps_files = [ - "//heron/schedulers/src/java:aurora-scheduler-java", -] yarn_deps_files = [ "//heron/packing/src/java:roundrobin-packing", @@ -89,22 +86,6 @@ nomad_deps_files = \ "//heron/schedulers/src/java:scheduler-utils-java", ] -java_library( - name = "aurora-tests", - srcs = glob(["**/aurora/*.java"]), - deps = scheduler_deps_files + aurora_deps_files + ["@maven//:commons_cli_commons_cli"], -) - -java_tests( - size = "small", - test_classes = [ - "org.apache.heron.scheduler.aurora.AuroraSchedulerTest", - "org.apache.heron.scheduler.aurora.AuroraLauncherTest", - "org.apache.heron.scheduler.aurora.AuroraCLIControllerTest", - "org.apache.heron.scheduler.aurora.AuroraContextTest", - ], - runtime_deps = [":aurora-tests"], -) java_library( name = "yarn-tests", diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java deleted file mode 100644 index 4fad3998915..00000000000 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraCLIControllerTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.utils.PackingTestUtils; - -public class AuroraCLIControllerTest { - private static final String JOB_NAME = "jobName"; - private static final String CLUSTER = "cluster"; - private static final String ROLE = "role"; - private static final String ENV = "gz"; - private static final String AURORA_FILENAME = "file.aurora"; - private static final String VERBOSE_CONFIG = "--verbose"; - private static final String BATCH_CONFIG = "--batch-size"; - private static final String JOB_SPEC = String.format("%s/%s/%s/%s", CLUSTER, ROLE, ENV, JOB_NAME); - private static final boolean IS_VERBOSE = true; - - private AuroraCLIController controller; - - @BeforeClass - public static void beforeClass() throws Exception { - - } - - @AfterClass - public static void afterClass() throws Exception { - } - - @Before - public void setUp() throws Exception { - controller = Mockito.spy( - new AuroraCLIController(JOB_NAME, CLUSTER, ROLE, ENV, AURORA_FILENAME, IS_VERBOSE)); - } - - @After - public void after() throws Exception { - } - - @Test - public void testCreateJob() throws Exception { - Map<AuroraField, String> bindings = new HashMap<>(); - Map<String, String> bindings2 = new HashMap<>(); - List<String> expectedCommand = asList("aurora job create --wait-until RUNNING %s %s %s", - JOB_SPEC, AURORA_FILENAME, VERBOSE_CONFIG); - - // Failed - Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertFalse(controller.createJob(bindings, bindings2)); - Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand)); - - // Happy path - Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertTrue(controller.createJob(bindings, bindings2)); - Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand); - } - - @Test - public void testKillJob() throws Exception { - List<String> expectedCommand = asList("aurora job killall %s %s %s %d", - JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE); - - // Failed - Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertFalse(controller.killJob()); - Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand)); - - // Happy path - Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertTrue(controller.killJob()); - Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand); - } - - @Test - public void testRestartJob() throws Exception { - int containerId = 1; - List<String> expectedCommand = asList("aurora job restart %s/%s %s %s %d", - JOB_SPEC, containerId, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE); - - // Failed - Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertFalse(controller.restart(containerId)); - Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand)); - - // Happy path - Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class)); - Assert.assertTrue(controller.restart(containerId)); - Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand); - } - - @Test - public void testRemoveContainers() { - class ContainerPlanComparator implements Comparator<PackingPlan.ContainerPlan> { - @Override - public int compare(PackingPlan.ContainerPlan o1, PackingPlan.ContainerPlan o2) { - return ((Integer) o1.getId()).compareTo(o2.getId()); - } - } - SortedSet<PackingPlan.ContainerPlan> containers = new TreeSet<>(new ContainerPlanComparator()); - containers.add(PackingTestUtils.testContainerPlan(3)); - containers.add(PackingTestUtils.testContainerPlan(5)); - - List<String> expectedCommand = asList("aurora job kill %s/3,5 %s %s %d", - JOB_SPEC, VERBOSE_CONFIG, BATCH_CONFIG, Integer.MAX_VALUE); - - Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class)); - controller.removeContainers(containers); - Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand)); - } - - @Test - public void testAddContainers() { - Integer containersToAdd = 3; - List<String> expectedCommand = asList( - "aurora job add --wait-until RUNNING %s/0 %s %s", - JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG); - - Mockito.doAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock arg0) throws Throwable { - final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2]; - originalArgument.append("Querying instance statuses: [1, 2, 3]"); - return true; - } - }).when(controller).runProcess( - Matchers.anyListOf(String.class), - Matchers.any(StringBuilder.class), - Matchers.any(StringBuilder.class)); - Set<Integer> ret = controller.addContainers(containersToAdd); - Assert.assertEquals(containersToAdd.intValue(), ret.size()); - Mockito.verify(controller) - .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any()); - } - - @Test - public void testAddContainersFailure() { - Integer containersToAdd = 3; - List<String> expectedCommand = asList( - "aurora job add --wait-until RUNNING %s/0 %s %s", - JOB_SPEC, containersToAdd.toString(), VERBOSE_CONFIG); - - Mockito.doAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock arg0) throws Throwable { - final StringBuilder originalArgument = (StringBuilder) (arg0.getArguments())[2]; - originalArgument.append("Querying instance statuses: x"); - return true; - } - }).when(controller).runProcess( - Matchers.anyListOf(String.class), - Matchers.any(StringBuilder.class), - Matchers.any(StringBuilder.class)); - Set<Integer> ret = controller.addContainers(containersToAdd); - Assert.assertEquals(0, ret.size()); - Mockito.verify(controller) - .runProcess(Matchers.eq(expectedCommand), Matchers.any(), Matchers.any()); - } - - private static List<String> asList(String command, Object... values) { - return new ArrayList<>(Arrays.asList(String.format(command, values).split(" "))); - } -} diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java deleted file mode 100644 index 4f4017f41a7..00000000000 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraContextTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.common.Key; - -public class AuroraContextTest { - - @Test - public void testUsesConfigString() { - final String auroraTemplate = "/dir/test.aurora"; - Config config = Config.newBuilder() - .put(AuroraContext.JOB_TEMPLATE, auroraTemplate) - .put(Key.HERON_CONF, "/test") - .build(); - Assert.assertEquals("Expected to use value from JOB_TEMPLATE config", - auroraTemplate, AuroraContext.getHeronAuroraPath(config)); - } - - @Test - public void testFallback() { - Config config = Config.newBuilder() - .put(Key.HERON_CONF, "/test") - .build(); - Assert.assertEquals("Expected to use heron_conf/heron.aurora", "/test/heron.aurora", - AuroraContext.getHeronAuroraPath(config)); - } -} diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java deleted file mode 100644 index 76e94a5d9e9..00000000000 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraLauncherTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import org.apache.heron.scheduler.utils.LauncherUtils; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.scheduler.IScheduler; - -@RunWith(PowerMockRunner.class) -@PowerMockIgnore("jdk.internal.reflect.*") -@PrepareForTest(LauncherUtils.class) -public class AuroraLauncherTest { - @Test - public void testLaunch() throws Exception { - Config config = Config.newBuilder().build(); - AuroraLauncher launcher = Mockito.spy(AuroraLauncher.class); - launcher.initialize(config, config); - - LauncherUtils mockLauncherUtils = Mockito.mock(LauncherUtils.class); - PowerMockito.spy(LauncherUtils.class); - PowerMockito.doReturn(mockLauncherUtils).when(LauncherUtils.class, "getInstance"); - - // Failed to schedule - Mockito.when(mockLauncherUtils.onScheduleAsLibrary( - Mockito.any(Config.class), - Mockito.any(Config.class), - Mockito.any(IScheduler.class), - Mockito.any(PackingPlan.class))).thenReturn(false); - - Assert.assertFalse(launcher.launch(Mockito.mock(PackingPlan.class))); - Mockito.verify(mockLauncherUtils).onScheduleAsLibrary( - Mockito.any(Config.class), - Mockito.any(Config.class), - Mockito.any(IScheduler.class), - Mockito.any(PackingPlan.class)); - - // Happy path - Mockito.when(mockLauncherUtils.onScheduleAsLibrary( - Mockito.any(Config.class), - Mockito.any(Config.class), - Mockito.any(IScheduler.class), - Mockito.any(PackingPlan.class))).thenReturn(true); - - Assert.assertTrue(launcher.launch(Mockito.mock(PackingPlan.class))); - Mockito.verify(mockLauncherUtils, Mockito.times(2)).onScheduleAsLibrary( - Mockito.any(Config.class), - Mockito.any(Config.class), - Mockito.any(IScheduler.class), - Mockito.any(PackingPlan.class)); - - launcher.close(); - } -} diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java deleted file mode 100644 index a73a7e0676d..00000000000 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/aurora/AuroraSchedulerTest.java +++ /dev/null @@ -1,372 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.aurora; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import org.apache.commons.cli.CommandLine; -import org.apache.heron.api.generated.TopologyAPI; -import org.apache.heron.common.basics.ByteAmount; -import org.apache.heron.common.utils.topology.TopologyTests; -import org.apache.heron.proto.scheduler.Scheduler; -import org.apache.heron.proto.system.PackingPlans; -import org.apache.heron.scheduler.SubmitterMain; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.common.Key; -import org.apache.heron.spi.common.TokenSub; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.packing.Resource; -import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor; -import org.apache.heron.spi.utils.PackingTestUtils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@RunWith(PowerMockRunner.class) -@PowerMockIgnore("jdk.internal.reflect.*") -@PrepareForTest({TokenSub.class, Config.class}) -public class AuroraSchedulerTest { - private static final String AURORA_PATH = "path.aurora"; - private static final String PACKING_PLAN_ID = "packing.plan.id"; - private static final String TOPOLOGY_NAME = "topologyName"; - private static final int CONTAINER_ID = 7; - - private static AuroraScheduler scheduler; - - @Before - public void setUp() throws Exception { - } - - @After - public void after() throws Exception { - } - - @BeforeClass - public static void beforeClass() throws Exception { - scheduler = Mockito.spy(AuroraScheduler.class); - doReturn(new HashMap<String, String>()) - .when(scheduler).createAuroraProperties(Mockito.any(Resource.class)); - } - - @AfterClass - public static void afterClass() throws Exception { - scheduler.close(); - } - - /** - * Tests that we can schedule - */ - @Test - public void testOnSchedule() throws Exception { - AuroraController controller = Mockito.mock(AuroraController.class); - doReturn(controller).when(scheduler).getController(); - - SchedulerStateManagerAdaptor stateManager = mock(SchedulerStateManagerAdaptor.class); - Config runtime = Mockito.mock(Config.class); - when(runtime.get(Key.SCHEDULER_STATE_MANAGER_ADAPTOR)).thenReturn(stateManager); - when(runtime.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME); - - Config mConfig = Mockito.mock(Config.class); - PowerMockito.mockStatic(Config.class); - when(Config.toClusterMode(mConfig)).thenReturn(mConfig); - - when(mConfig.getStringValue(eq(AuroraContext.JOB_TEMPLATE), - anyString())).thenReturn(AURORA_PATH); - - scheduler.initialize(mConfig, runtime); - - // Fail to schedule due to null PackingPlan - Assert.assertFalse(scheduler.onSchedule(null)); - - PackingPlan plan = new PackingPlan(PACKING_PLAN_ID, new HashSet<PackingPlan.ContainerPlan>()); - assertTrue(plan.getContainers().isEmpty()); - - // Fail to schedule due to PackingPlan is empty - Assert.assertFalse(scheduler.onSchedule(plan)); - - // Construct valid PackingPlan - Set<PackingPlan.ContainerPlan> containers = new HashSet<>(); - containers.add(PackingTestUtils.testContainerPlan(CONTAINER_ID)); - PackingPlan validPlan = new PackingPlan(PACKING_PLAN_ID, containers); - - // Failed to create job via controller - doReturn(false).when(controller) - .createJob(Matchers.anyMapOf(AuroraField.class, String.class), - Matchers.anyMapOf(String.class, String.class)); - doReturn(true).when(stateManager) - .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME)); - - Assert.assertFalse(scheduler.onSchedule(validPlan)); - - Mockito.verify(controller) - .createJob(Matchers.anyMapOf(AuroraField.class, String.class), - Matchers.anyMapOf(String.class, String.class)); - Mockito.verify(stateManager) - .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME)); - - // Happy path - doReturn(true).when(controller) - .createJob(Matchers.anyMapOf(AuroraField.class, String.class), - Matchers.anyMapOf(String.class, String.class)); - assertTrue(scheduler.onSchedule(validPlan)); - - Mockito.verify(controller, Mockito.times(2)) - .createJob(Matchers.anyMapOf(AuroraField.class, String.class), - Matchers.anyMapOf(String.class, String.class)); - Mockito.verify(stateManager, Mockito.times(2)) - .updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME)); - } - - @Test - public void testOnKill() throws Exception { - Config mockConfig = Mockito.mock(Config.class); - PowerMockito.mockStatic(Config.class); - when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig); - - AuroraController controller = Mockito.mock(AuroraController.class); - doReturn(controller).when(scheduler).getController(); - scheduler.initialize(mockConfig, Mockito.mock(Config.class)); - - // Failed to kill job via controller - doReturn(false).when(controller).killJob(); - Assert.assertFalse(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance())); - Mockito.verify(controller).killJob(); - - // Happy path - doReturn(true).when(controller).killJob(); - assertTrue(scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance())); - Mockito.verify(controller, Mockito.times(2)).killJob(); - } - - @Test - public void testOnRestart() throws Exception { - Config mockConfig = Mockito.mock(Config.class); - PowerMockito.mockStatic(Config.class); - when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig); - - AuroraController controller = Mockito.mock(AuroraController.class); - doReturn(controller).when(scheduler).getController(); - scheduler.initialize(mockConfig, Mockito.mock(Config.class)); - - // Construct the RestartTopologyRequest - int containerToRestart = 1; - Scheduler.RestartTopologyRequest restartTopologyRequest = - Scheduler.RestartTopologyRequest.newBuilder(). - setTopologyName(TOPOLOGY_NAME).setContainerIndex(containerToRestart). - build(); - - // Failed to kill job via controller - doReturn(false).when( - controller).restart(containerToRestart); - Assert.assertFalse(scheduler.onRestart(restartTopologyRequest)); - Mockito.verify(controller).restart(containerToRestart); - - // Happy path - doReturn(true).when( - controller).restart(containerToRestart); - assertTrue(scheduler.onRestart(restartTopologyRequest)); - Mockito.verify(controller, Mockito.times(2)).restart(containerToRestart); - } - - @Test - public void testGetJobLinks() throws Exception { - final String JOB_LINK_FORMAT = "http://go/${CLUSTER}/${ROLE}/${ENVIRON}/${TOPOLOGY}"; - final String SUBSTITUTED_JOB_LINK = "http://go/local/heron/test/test_topology"; - - Config mockConfig = Mockito.mock(Config.class); - when(mockConfig.getStringValue(AuroraContext.JOB_LINK_TEMPLATE)) - .thenReturn(JOB_LINK_FORMAT); - - PowerMockito.mockStatic(Config.class); - when(Config.toClusterMode(mockConfig)).thenReturn(mockConfig); - - AuroraController controller = Mockito.mock(AuroraController.class); - doReturn(controller).when(scheduler).getController(); - scheduler.initialize(mockConfig, Mockito.mock(Config.class)); - - PowerMockito.spy(TokenSub.class); - PowerMockito.doReturn(SUBSTITUTED_JOB_LINK) - .when(TokenSub.class, "substitute", mockConfig, JOB_LINK_FORMAT); - - List<String> result = scheduler.getJobLinks(); - - assertEquals(1, result.size()); - assertTrue(result.get(0).equals(SUBSTITUTED_JOB_LINK)); - } - - - @Test - public void testProperties() throws URISyntaxException { - TopologyAPI.Topology topology = TopologyTests.createTopology( - TOPOLOGY_NAME, new org.apache.heron.api.Config(), - "spoutName", "boltName", 1, 1); - - Config runtime = mock(Config.class); - when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(topology); - when(runtime.get(Key.TOPOLOGY_PACKAGE_URI)).thenReturn(new URI("http://foo/bar")); - - // This must mimic how SubmitterMain loads configs - CommandLine commandLine = mock(CommandLine.class); - when(commandLine.getOptionValue("cluster")).thenReturn("some_cluster"); - when(commandLine.getOptionValue("role")).thenReturn("some_role"); - when(commandLine.getOptionValue("environment")).thenReturn("some_env"); - when(commandLine.getOptionValue("heron_home")).thenReturn("/some/heron/home"); - when(commandLine.getOptionValue("config_path")).thenReturn("/some/config/path"); - when(commandLine.getOptionValue("topology_package")).thenReturn("jar"); - when(commandLine.getOptionValue("topology_defn")).thenReturn("/mock/defnFile.defn"); - when(commandLine.getOptionValue("topology_bin")).thenReturn("binaryFile.jar"); - Config config = Mockito.spy(SubmitterMain.loadConfig(commandLine, topology)); - - AuroraScheduler testScheduler = new AuroraScheduler(); - testScheduler.initialize(config, runtime); - Resource containerResource = - new Resource(2.3, ByteAmount.fromGigabytes(2), ByteAmount.fromGigabytes(3)); - Map<AuroraField, String> properties = testScheduler.createAuroraProperties(containerResource); - - // this part is key, the conf path in the config is absolute to the install dir, but what - // aurora properties get below is the relative ./heron-conf path to be used when run remotely - assertEquals("Invalid value for key " + Key.HERON_CONF, - "/some/config/path", config.getStringValue(Key.HERON_CONF)); - - String expectedConf = "./heron-conf"; - String expectedBin = "./heron-core/bin"; - String expectedLib = "./heron-core/lib"; - String expectedDist = "./heron-core/dist"; - for (AuroraField field : AuroraField.values()) { - boolean asserted = false; - Object expected = null; - Object found = properties.get(field); - switch (field) { - case CORE_PACKAGE_URI: - expected = expectedDist + "/heron-core.tar.gz"; - break; - case CPUS_PER_CONTAINER: - expected = Double.valueOf(containerResource.getCpu()).toString(); - break; - case DISK_PER_CONTAINER: - expected = Long.valueOf(containerResource.getDisk().asBytes()).toString(); - break; - case RAM_PER_CONTAINER: - expected = Long.valueOf(containerResource.getRam().asBytes()).toString(); - break; - case TIER: - expected = "preemptible"; - break; - case NUM_CONTAINERS: - expected = "2"; - break; - case EXECUTOR_BINARY: - expected = expectedBin + "/heron-executor"; - break; - case TOPOLOGY_PACKAGE_URI: - expected = "http://foo/bar"; - break; - case TOPOLOGY_ARGUMENTS: - expected = "--topology-name=topologyName" - + " --topology-id=" + topology.getId() - + " --topology-defn-file=defnFile.defn" - + " --state-manager-connection=null" - + " --state-manager-root=null" - + " --state-manager-config-file=" + expectedConf + "/statemgr.yaml" - + " --tmanager-binary=" + expectedBin + "/heron-tmanager" - + " --stmgr-binary=" + expectedBin + "/heron-stmgr" - + " --metrics-manager-classpath=" + expectedLib + "/metricsmgr/*" - + " --instance-jvm-opts=\"\"" - + " --classpath=binaryFile.jar" - + " --heron-internals-config-file=" + expectedConf + "/heron_internals.yaml" - + " --override-config-file=" + expectedConf + "/override.yaml" - + " --component-ram-map=null" - + " --component-jvm-opts=\"\"" - + " --pkg-type=jar" - + " --topology-binary-file=binaryFile.jar" - + " --heron-java-home=/usr/lib/jvm/default-java" - + " --heron-shell-binary=" + expectedBin + "/heron-shell" - + " --cluster=some_cluster" - + " --role=some_role" - + " --environment=some_env" - + " --instance-classpath=" + expectedLib + "/instance/*" - + " --metrics-sinks-config-file=" + expectedConf + "/metrics_sinks.yaml" - + " --scheduler-classpath=" + expectedLib + "/scheduler/*:./heron-core" - + "/lib/packing/*:" + expectedLib + "/statemgr/*" - + " --python-instance-binary=" + expectedBin + "/heron-python-instance" - + " --cpp-instance-binary=" + expectedBin + "/heron-cpp-instance" - + " --metricscache-manager-classpath=" + expectedLib + "/metricscachemgr/*" - + " --metricscache-manager-mode=disabled" - + " --is-stateful=false" - + " --checkpoint-manager-classpath=" + expectedLib + "/ckptmgr/*:" - + expectedLib + "/statefulstorage/*:" - + " --stateful-config-file=" + expectedConf + "/stateful.yaml" - + " --checkpoint-manager-ram=1073741824" - + " --health-manager-mode=disabled" - + " --health-manager-classpath=" + expectedLib + "/healthmgr/*"; - break; - case CLUSTER: - expected = "some_cluster"; - break; - case ENVIRON: - expected = "some_env"; - break; - case ROLE: - expected = "some_role"; - break; - case TOPOLOGY_NAME: - expected = "topologyName"; - break; - default: - fail(String.format( - "Expected value for Aurora field %s not found in test (found=%s)", field, found)); - } - if (!asserted) { - assertEquals("Incorrect value found for field " + field, expected, found); - } - properties.remove(field); - } - - assertTrue("The following aurora fields were not set by the scheduler: " + properties, - properties.isEmpty()); - } -}
