Repository: incubator-brooklyn Updated Branches: refs/heads/master cc4ffb9d7 -> 83acab217
MySqlCluster - provide slave status as an attribute Also tie it to service.notUp.indicators Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/51deebac Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/51deebac Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/51deebac Branch: refs/heads/master Commit: 51deebac6323404cc8148bb858da0f3fb0f2f872 Parents: bae9628 Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Authored: Thu Jul 30 21:56:49 2015 +0300 Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Committed: Wed Aug 5 15:23:06 2015 +0300 ---------------------------------------------------------------------- .../entity/database/mysql/MySqlCluster.java | 4 + .../entity/database/mysql/MySqlClusterImpl.java | 92 +++++++++++++++----- .../entity/database/mysql/MySqlRowParser.java | 39 +++++++++ 3 files changed, 113 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java index 8b19ef7..322ada9 100644 --- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java @@ -39,6 +39,10 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor("mysql.master.log_file", "The binary log file master is writing to"); AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor("mysql.master.log_position", "The position in the log file to start replication"); } + interface MySqlSlave { + AttributeSensor<Boolean> SLAVE_HEALTHY = Sensors.newBooleanSensor("mysql.slave.healthy", "Indicates that the replication state of the slave is healthy"); + AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master", "How many seconds behind master is the replication state on the slave"); + } ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey( "mysql.slave.username", "The user name slaves will use to connect to the master", "slave"); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java index 3eaa335..190a0d7 100644 --- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java @@ -19,16 +19,15 @@ package brooklyn.entity.database.mysql; import java.util.Collection; -import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; +import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.base.Splitter; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; @@ -38,6 +37,7 @@ import com.google.common.reflect.TypeToken; import brooklyn.config.ConfigKey; import brooklyn.enricher.Enrichers; import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.EntityInternal; import brooklyn.entity.basic.EntityLocal; import brooklyn.entity.basic.EntityPredicates; @@ -49,12 +49,17 @@ import brooklyn.event.SensorEvent; import brooklyn.event.SensorEventListener; import brooklyn.event.basic.DependentConfiguration; import brooklyn.event.basic.Sensors; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; import brooklyn.location.Location; import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.guava.Functionals; import brooklyn.util.guava.IfFunctions; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.TaskBuilder; import brooklyn.util.text.Identifiers; +import brooklyn.util.text.StringPredicates; +import brooklyn.util.time.Duration; // https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html @@ -68,7 +73,6 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster private static final String MASTER_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_master.conf"; private static final String SLAVE_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_slave.conf"; - private static final String NOT_UP_REPLICATION = "replication_not_configured"; private static final int MASTER_SERVER_ID = 1; private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID); @@ -214,13 +218,64 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster @Override protected Entity createNode(Location loc, Map<?, ?> flags) { Entity node = super.createNode(loc, flags); - Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID); - if (serverId > 0) { - ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION, "Replication not started"); + if (!IS_MASTER.apply(node)) { + ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY, "Replication not started"); + + FunctionFeed.builder() + .entity((EntityLocal)node) + .period(Duration.FIVE_SECONDS) + .poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY) + .callable(new SlaveStateCallable(node)) + .checkSuccess(StringPredicates.isNonBlank()) + .onSuccess(new SlaveStateParser(node)) + .setOnFailure(false) + .description("Polls SHOW SLAVE STATUS")) + .build(); + + node.addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(MySqlSlave.SLAVE_HEALTHY) + .computing(Functionals.ifNotEquals(true).value("Slave replication status is not healthy") ) + .build()); } return node; } + public class SlaveStateCallable implements Callable<String> { + private Entity slave; + public SlaveStateCallable(Entity slave) { + this.slave = slave; + } + + @Override + public String call() throws Exception { + if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) { + return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", "SHOW SLAVE STATUS \\G")).asTask().getUnchecked(); + } else { + return null; + } + } + + } + + public class SlaveStateParser implements Function<String, Boolean> { + private Entity slave; + + public SlaveStateParser(Entity slave) { + this.slave = slave; + } + + @Override + public Boolean apply(String result) { + Map<String, String> status = MySqlRowParser.parseSingle(result); + String secondsBehindMaster = status.get("Seconds_Behind_Master"); + if (secondsBehindMaster != null && !"NULL".equals(secondsBehindMaster)) { + ((EntityLocal)slave).setAttribute(MySqlSlave.SLAVE_SECONDS_BEHIND_MASTER, new Integer(secondsBehindMaster)); + } + return "Yes".equals(status.get("Slave_IO_Running")) && "Yes".equals(status.get("Slave_SQL_Running")); + } + + } + private static class NextServerIdSupplier implements Supplier<Integer> { private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1); @@ -252,25 +307,18 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster } else if (serverId > MASTER_SERVER_ID) { initSlave(node); } - ServiceNotUpLogic.clearNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION); } private void initMaster(MySqlNode master) { String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;"); - Iterator<String> splitIter = Splitter.on(Pattern.compile("\\n|:")) - .omitEmptyStrings() - .trimResults() - .split(binLogInfo) - .iterator(); - while (splitIter.hasNext()) { - String part = splitIter.next(); - if (part.equals("File")) { - String file = splitIter.next(); - ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file); - } else if (part.equals("Position")) { - Integer position = new Integer(splitIter.next()); - ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, position); - } + Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo); + String file = status.get("File"); + if (file != null) { + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file); + } + String position = status.get("Position"); + if (position != null) { + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, new Integer(position)); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/51deebac/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java new file mode 100644 index 0000000..cdd5149 --- /dev/null +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlRowParser.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.database.mysql; + +import java.util.Map; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.text.Strings; + +public class MySqlRowParser { + public static Map<String, String> parseSingle(String row) { + Map<String, String> values = MutableMap.of(); + String[] lines = row.split("\\n"); + for (String line : lines) { + if (line.startsWith("*")) continue; // row delimiter + String[] arr = line.split(":", 2); + String key = arr[0].trim(); + String value = Strings.emptyToNull(arr[1].trim()); + values.put(key, value); + } + return values; + }; +}