Dump based replication init When the replication logs on master no longer start from the beginning a database dump will be performed to initialize the slave
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0de0a2a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0de0a2a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0de0a2a9 Branch: refs/heads/master Commit: 0de0a2a944b08437c4f0eda407265bd89a26147a Parents: 5fda6ed Author: Svetoslav Neykov <[email protected]> Authored: Thu Oct 15 17:36:31 2015 +0300 Committer: Svetoslav Neykov <[email protected]> Committed: Mon Oct 19 16:48:52 2015 +0300 ---------------------------------------------------------------------- .../brooklyn/util/core/task/DynamicTasks.java | 17 + .../system_service/SystemServiceEnricher.java | 12 +- .../entity/database/DatastoreMixins.java | 5 +- .../database/mysql/InitSlaveTaskBody.java | 385 +++++++++++++++++++ .../entity/database/mysql/MySqlCluster.java | 12 +- .../entity/database/mysql/MySqlClusterImpl.java | 230 ++++------- .../database/mysql/MySqlClusterUtils.java | 52 +++ .../entity/database/mysql/MySqlDriver.java | 7 +- .../entity/database/mysql/MySqlNode.java | 43 ++- .../database/mysql/MySqlNodeEffectors.java | 87 +++++ .../entity/database/mysql/MySqlNodeImpl.java | 7 +- .../entity/database/mysql/MySqlSshDriver.java | 41 +- .../database/mysql/ReplicationSnapshot.java | 58 +++ .../entity/database/VogellaExampleAccess.java | 16 +- .../mysql/MySqlClusterIntegrationTest.java | 138 ++++++- .../database/mysql/MySqlClusterTestHelper.java | 35 +- 16 files changed, 922 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java index 52ec88d..5574709 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java @@ -28,6 +28,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable; import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.api.mgmt.TaskQueueingContext; import org.apache.brooklyn.api.mgmt.TaskWrapper; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; @@ -333,4 +334,20 @@ public class DynamicTasks { return queueIfPossible(task).orSubmitAsync(entity).asTask(); } + /** Breaks the parent-child relation between Tasks.current() and the task passed, + * making the new task a top-level one at the target entity. + * To make it visible in the UI, also tag the task with: + * .tag(BrooklynTaskTags.tagForContextEntity(entity)) + * .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) + */ + public static <T> Task<T> submitTopLevelTask(TaskAdaptable<T> task, Entity entity) { + Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get(); + BasicExecutionManager.getPerThreadCurrentTask().set(null); + try { + return Entities.submit(entity, task).asTask(); + } finally { + BasicExecutionManager.getPerThreadCurrentTask().set(currentTask); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java index fd9c32e..26c0fdb 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java @@ -114,17 +114,7 @@ public class SystemServiceEnricher extends AbstractEnricher implements Enricher .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) .build(); - submitTopLevel(updateService); - } - - private void submitTopLevel(Task<Void> updateService) { - Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get(); - BasicExecutionManager.getPerThreadCurrentTask().set(null); - try { - Entities.submit(entity, updateService); - } finally { - BasicExecutionManager.getPerThreadCurrentTask().set(currentTask); - } + DynamicTasks.submitTopLevelTask(updateService, entity); } private String getLaunchScript(String stdin, String env) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java index 534c0eb..67eda16 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/DatastoreMixins.java @@ -50,9 +50,10 @@ public class DatastoreMixins { public static final Effector<String> EXECUTE_SCRIPT = CanExecuteScript.EXECUTE_SCRIPT; public static interface CanExecuteScript { - public static final Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript") + ConfigKey<String> COMMANDS = ConfigKeys.newStringConfigKey("commands"); + Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript") .description("executes the given script contents") - .parameter(String.class, "commands") + .parameter(COMMANDS) .buildAbstract(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java new file mode 100644 index 0000000..f125024 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.effector.EffectorTasks; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; +import org.apache.brooklyn.core.entity.EntityPredicates; +import org.apache.brooklyn.core.sensor.DependentConfiguration; +import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector; +import org.apache.brooklyn.entity.software.base.SoftwareProcess; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.TaskTags; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; +import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; +import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.ssh.BashCommands; +import org.apache.brooklyn.util.text.Identifiers; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang3.concurrent.ConcurrentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Predicates; +import com.google.common.base.Strings; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class InitSlaveTaskBody implements Runnable { + private static final String SNAPSHOT_DUMP_OPTIONS = "--skip-lock-tables --single-transaction --flush-logs --hex-blob -A"; + + private static final Logger log = LoggerFactory.getLogger(InitSlaveTaskBody.class); + + private final MySqlCluster cluster; + private final MySqlNode slave; + private Semaphore lock; + + public InitSlaveTaskBody(MySqlCluster cluster, MySqlNode slave, Semaphore lock) { + this.cluster = cluster; + this.slave = slave; + this.lock = lock; + } + + @Override + public void run() { + // Replication init state consists of: + // * initial dump (optional) + // * location of initial dump (could be on any of the members, optional) + // * bin log file name + // * bin log position + // 1. Check replication state: + // * Does the dump exist (and the machine where it is located) + // * Does the bin log exist on the master + // 2. If the replication state is not valid create a new one + // * Select a slave to dump, master if no slaves + // * If it's a slave do 'STOP SLAVE SQL_THREAD;' + // * Call mysqldump to create the snapshot + // * When done if a slave do 'START SLAVE SQL_THREAD;' + // * Get master state from the dump - grep "MASTER_LOG_POS" dump.sql. + // If slave get state from 'SHOW SLAVE STATUS' + // * Save new init info in cluster - bin log name, position, dump + // 3. Init Slave + // * transfer dump to new slave (if dump exists) + // * import - mysql < ~/dump.sql + // * change master to and start slave + //!!! Caveat if dumping from master and MyISAM tables are used dump may be inconsistent. + // * Only way around it is to lock the database while dumping (or taking a snapshot through LVM which is quicker) + bootstrapSlaveAsync(getValidReplicationInfo(), slave); + cluster.getAttribute(MySqlClusterImpl.SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + } + + private MySqlNode getMaster() { + return (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER); + } + + private void bootstrapSlaveAsync(final Future<ReplicationSnapshot> replicationInfoFuture, final MySqlNode slave) { + DynamicTasks.queue("bootstrap slave replication", new Runnable() { + @Override + public void run() { + ReplicationSnapshot replicationSnapshot; + try { + replicationSnapshot = replicationInfoFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw Exceptions.propagate(e); + } + + MySqlNode master = getMaster(); + String masterAddress = MySqlClusterUtils.validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS)); + Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT); + String slaveAddress = MySqlClusterUtils.validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(MySqlCluster.SLAVE_USERNAME)); + String password = MySqlClusterUtils.validateSqlParam(cluster.getAttribute(MySqlCluster.SLAVE_PASSWORD)); + + if (replicationSnapshot.getEntityId() != null) { + Entity sourceEntity = Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId())); + String dumpId = FilenameUtils.removeExtension(replicationSnapshot.getSnapshotPath()); + copyDumpAsync(sourceEntity, slave, replicationSnapshot.getSnapshotPath(), dumpId); + DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.IMPORT_DUMP, ImmutableMap.of("path", replicationSnapshot.getSnapshotPath()))); + //The dump resets the password to whatever is on the source instance, reset it back. + //We are able to still login because privileges are not flushed, so we just set the password to the same value. + DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.CHANGE_PASSWORD, ImmutableMap.of("password", slave.getAttribute(MySqlNode.PASSWORD)))); // + //Flush privileges to load new users coming from the dump + MySqlClusterUtils.executeSqlOnNodeAsync(slave, "FLUSH PRIVILEGES;"); + } + + MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format( + "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" + + "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n", + username, slaveAddress, password, username, slaveAddress)); + + // Executing this will unblock SERVICE_UP wait in the start effector + String slaveCmd = String.format( + "CHANGE MASTER TO " + + "MASTER_HOST='%s', " + + "MASTER_PORT=%d, " + + "MASTER_USER='%s', " + + "MASTER_PASSWORD='%s', " + + "MASTER_LOG_FILE='%s', " + + "MASTER_LOG_POS=%d;\n" + + "START SLAVE;\n", + masterAddress, masterPort, + username, password, + replicationSnapshot.getBinLogName(), + replicationSnapshot.getBinLogPosition()); + MySqlClusterUtils.executeSqlOnNodeAsync(slave, slaveCmd); + } + }); + } + + private void copyDumpAsync(Entity source, Entity dest, String sourceDumpPath, String dumpId) { + final SshMachineLocation sourceMachine = EffectorTasks.getSshMachine(source); + final SshMachineLocation destMachine = EffectorTasks.getSshMachine(dest); + + String sourceRunDir = source.getAttribute(MySqlNode.RUN_DIR); + String privateKeyFile = dumpId + ".id_rsa"; + final Task<String> tempKeyTask = DynamicTasks.queue(SshEffectorTasks.ssh( + "cd $RUN_DIR", + "PRIVATE_KEY=" + privateKeyFile, + "ssh-keygen -t rsa -N '' -f $PRIVATE_KEY -C " + dumpId + " > /dev/null", + "cat $PRIVATE_KEY.pub") + .environmentVariable("RUN_DIR", sourceRunDir) + .machine(sourceMachine) + .summary("generate private key for slave access") + .requiringZeroAndReturningStdout()) + .asTask(); + + DynamicTasks.queue("add key to authorized_keys", new Runnable() { + @Override + public void run() { + String publicKey = tempKeyTask.getUnchecked(); + DynamicTasks.queue(SshEffectorTasks.ssh(String.format( + "cat >> ~/.ssh/authorized_keys <<EOF\n%s\nEOF", + publicKey)) + .machine(destMachine) + .summary("Add key to authorized_keys") + .requiringExitCodeZero()); + } + }); + + final ProcessTaskWrapper<Integer> copyTask = SshEffectorTasks.ssh( + "cd $RUN_DIR", + String.format( + "scp -o 'BatchMode yes' -o 'StrictHostKeyChecking no' -i '%s' '%s' '%s@%s:%s/%s.sql'", + privateKeyFile, + sourceDumpPath, + destMachine.getUser(), + dest.getAttribute(MySqlNode.SUBNET_ADDRESS), + dest.getAttribute(MySqlNode.RUN_DIR), + dumpId)) + .environmentVariable("RUN_DIR", sourceRunDir) + .machine(sourceMachine) + .summary("copy database dump to slave") + .newTask(); + // Let next couple of tasks complete even if this one fails so that we can clean up. + TaskTags.markInessential(copyTask); + DynamicTasks.queue(copyTask); + + // Delete private key + DynamicTasks.queue(SshEffectorTasks.ssh( + "cd $RUN_DIR", + "rm " + privateKeyFile) + .environmentVariable("RUN_DIR", sourceRunDir) + .machine(sourceMachine) + .summary("remove private key")); + + DynamicTasks.queue(SshEffectorTasks.ssh(String.format( + "sed -i'' -e '/%s/d' ~/.ssh/authorized_keys", + dumpId)) + .machine(destMachine) + .summary("remove private key from authorized_keys")).asTask(); + + // The task will fail if copyTask fails, but only after the private key is deleted. + DynamicTasks.queue("check for successful copy", new Runnable() { + @Override + public void run() { + copyTask.asTask().getUnchecked(); + } + }); + } + + private Future<ReplicationSnapshot> getValidReplicationInfo() { + try { + try { + lock.acquire(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + ReplicationSnapshot replicationSnapshot = getAttributeBlocking(cluster, MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT); + if (!isReplicationInfoValid(replicationSnapshot)) { + final MySqlNode snapshotNode = getSnapshotNode(); + final String dumpName = getDumpUniqueId() + ".sql"; + if (MySqlClusterUtils.IS_MASTER.apply(snapshotNode)) { + return createMasterReplicationSnapshot(snapshotNode, dumpName); + } else { + return createSlaveReplicationSnapshot(snapshotNode, dumpName); + } + } + return ConcurrentUtils.constantFuture(replicationSnapshot); + } finally { + lock.release(); + } + } + + private Future<ReplicationSnapshot> createMasterReplicationSnapshot(final MySqlNode master, final String dumpName) { + log.info("MySql cluster " + cluster + ": generating new replication snapshot on master node " + master + " with name " + dumpName); + String dumpOptions = SNAPSHOT_DUMP_OPTIONS + " --master-data=2"; + ImmutableMap<String, String> params = ImmutableMap.of( + ExportDumpEffector.PATH.getName(), dumpName, + ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions); + DynamicTasks.queue(Effectors.invocation(master, MySqlNode.EXPORT_DUMP, params)); + return DynamicTasks.queue("get master log info from dump", new Callable<ReplicationSnapshot>() { + @Override + public ReplicationSnapshot call() throws Exception { + Pattern masterInfoPattern = Pattern.compile("CHANGE MASTER TO.*MASTER_LOG_FILE\\s*=\\s*'([^']+)'.*MASTER_LOG_POS\\s*=\\s*(\\d+)"); + String masterInfo = DynamicTasks.queue(execSshTask(master, "grep -m1 'CHANGE MASTER TO' " + dumpName, "Extract master replication status from dump") + .requiringZeroAndReturningStdout()).asTask().getUnchecked(); + Matcher masterInfoMatcher = masterInfoPattern.matcher(masterInfo); + if (!masterInfoMatcher.find() || masterInfoMatcher.groupCount() != 2) { + throw new IllegalStateException("Master dump doesn't contain replication info: " + masterInfo); + } + String masterLogFile = masterInfoMatcher.group(1); + int masterLogPosition = Integer.parseInt(masterInfoMatcher.group(2)); + ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(master.getId(), dumpName, masterLogFile, masterLogPosition); + cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot); + return replicationSnapshot; + } + }); + } + + private Future<ReplicationSnapshot> createSlaveReplicationSnapshot(final MySqlNode slave, final String dumpName) { + MySqlClusterUtils.executeSqlOnNodeAsync(slave, "STOP SLAVE SQL_THREAD;"); + try { + log.info("MySql cluster " + cluster + ": generating new replication snapshot on slave node " + slave + " with name " + dumpName); + String dumpOptions = SNAPSHOT_DUMP_OPTIONS; + ImmutableMap<String, String> params = ImmutableMap.of( + ExportDumpEffector.PATH.getName(), dumpName, + ExportDumpEffector.ADDITIONAL_OPTIONS.getName(), dumpOptions); + DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.EXPORT_DUMP, params)); + return DynamicTasks.queue("get master log info from slave", new Callable<ReplicationSnapshot>() { + @Override + public ReplicationSnapshot call() throws Exception { + String slaveStatusRow = slave.executeScript("SHOW SLAVE STATUS \\G"); + Map<String, String> slaveStatus = MySqlRowParser.parseSingle(slaveStatusRow); + String masterLogFile = slaveStatus.get("Relay_Master_Log_File"); + int masterLogPosition = Integer.parseInt(slaveStatus.get("Exec_Master_Log_Pos")); + ReplicationSnapshot replicationSnapshot = new ReplicationSnapshot(slave.getId(), dumpName, masterLogFile, masterLogPosition); + cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, replicationSnapshot); + return replicationSnapshot; + } + }); + } finally { + MySqlClusterUtils.executeSqlOnNodeAsync(slave, "START SLAVE SQL_THREAD;"); + } + } + + private MySqlNode getSnapshotNode() { + String snapshotNodeId = cluster.getConfig(MySqlCluster.REPLICATION_PREFERRED_SOURCE); + if (snapshotNodeId != null) { + Optional<Entity> preferredNode = Iterables.tryFind(cluster.getMembers(), EntityPredicates.idEqualTo(snapshotNodeId)); + if (preferredNode.isPresent()) { + return (MySqlNode) preferredNode.get(); + } else { + log.warn("MySql cluster " + this + " configured with preferred snapshot node " + snapshotNodeId + " but it's not a member. Defaulting to a random slave."); + } + } + return getRandomSlave(); + } + + private MySqlNode getRandomSlave() { + List<MySqlNode> slaves = getHealhtySlaves(); + if (slaves.size() > 0) { + return slaves.get(new Random().nextInt(slaves.size())); + } else { + return getMaster(); + } + } + + private ImmutableList<MySqlNode> getHealhtySlaves() { + return FluentIterable.from(cluster.getMembers()) + .filter(Predicates.not(MySqlClusterUtils.IS_MASTER)) + .filter(EntityPredicates.attributeEqualTo(MySqlNode.SERVICE_UP, Boolean.TRUE)) + .filter(MySqlNode.class) + .toList(); + } + + private boolean isReplicationInfoValid(ReplicationSnapshot replicationSnapshot) { + MySqlNode master = getMaster(); + String dataDir = Strings.nullToEmpty(master.getConfig(MySqlNode.DATA_DIR)); + if (!checkFileExistsOnEntity(master, Os.mergePathsUnix(dataDir, replicationSnapshot.getBinLogName()))) { + return false; + } + if (replicationSnapshot.getEntityId() != null) { + Optional<Entity> snapshotSlave = Iterables.tryFind(cluster.getChildren(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId())); + if (!snapshotSlave.isPresent()) { + log.info("MySql cluster " + cluster + " missing node " + replicationSnapshot.getEntityId() + " with last snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot."); + return false; + } + if (!checkFileExistsOnEntity(snapshotSlave.get(), replicationSnapshot.getSnapshotPath())) { + log.info("MySql cluster " + cluster + ", node " + snapshotSlave.get() + " missing replication snapshot " + replicationSnapshot.getSnapshotPath() + ". Will generate new snapshot."); + return false; + } + } + return true; + } + + private boolean checkFileExistsOnEntity(Entity entity, String path) { + String cmd = BashCommands.chain( + BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist.")); + String summary = "Check if file " + path + " exists"; + return DynamicTasks.queue(execSshTask(entity, cmd, summary).allowingNonZeroExitCode()).asTask().getUnchecked() == 0; + } + + private ProcessTaskFactory<Integer> execSshTask(Entity entity, String cmd, String summary) { + SshMachineLocation machine = EffectorTasks.getSshMachine(entity); + return SshTasks.newSshExecTaskFactory(machine, "cd $RUN_DIR\n" + cmd) + .allowingNonZeroExitCode() + .environmentVariable("RUN_DIR", entity.getAttribute(SoftwareProcess.RUN_DIR)) + .summary(summary); + } + + private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) { + return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked(); + } + + private String getDumpUniqueId() { + return "replication-dump-" + Identifiers.makeRandomId(8) + "-" + new SimpleDateFormat("yyyy-MM-dd--HH-mm-ss").format(new Date()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java index de43951..9ea5ffe 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java @@ -18,7 +18,7 @@ */ package org.apache.brooklyn.entity.database.mysql; -import java.util.Collection; +import java.util.List; import org.apache.brooklyn.api.catalog.Catalog; import org.apache.brooklyn.api.entity.ImplementedBy; @@ -36,11 +36,6 @@ import com.google.common.reflect.TypeToken; @Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png") public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { interface MySqlMaster { - AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor( - "mysql.master.log_file", "The binary log file master is writing to"); - AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor( - "mysql.master.log_position", "The position in the log file to start replication"); - ConfigKey<String> MASTER_CREATION_SCRIPT_CONTENTS = ConfigKeys.newStringConfigKey( "datastore.master.creation.script.contents", "Contents of creation script to initialize the master node after initializing replication"); @@ -52,6 +47,9 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master", "How many seconds behind master is the replication state on the slave"); } + AttributeSensor<ReplicationSnapshot> REPLICATION_LAST_SLAVE_SNAPSHOT = Sensors.newSensor(ReplicationSnapshot.class, "mysql.replication.last_slave_snapshot", "Last valid state to init slaves with"); + ConfigKey<String> REPLICATION_PREFERRED_SOURCE = ConfigKeys.newStringConfigKey("mysql.replication.preferred_source", "ID of node to get the replication snapshot from. If not set a random slave is used, falling back to master if no slaves."); + ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey( "mysql.slave.username", "The user name slaves will use to connect to the master", "slave"); ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey( @@ -69,7 +67,7 @@ public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey( "mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default."); @SuppressWarnings("serial") - AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {}, + AttributeSensor<List<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<List<String>>() {}, "mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors"); AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode"); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java index 9cebb21..e6afc18 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java @@ -18,10 +18,10 @@ */ package org.apache.brooklyn.entity.database.mysql; -import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -35,10 +35,8 @@ import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.entity.Attributes; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.entity.EntityPredicates; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; -import org.apache.brooklyn.core.sensor.DependentConfiguration; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.enricher.stock.Enrichers; import org.apache.brooklyn.entity.group.DynamicClusterImpl; @@ -57,7 +55,6 @@ import org.apache.brooklyn.util.time.Duration; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -67,23 +64,21 @@ import com.google.common.reflect.TypeToken; // https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html -// TODO Bootstrap slave from dump for the case where the binary log is purged -// TODO Promote slave to master +// TODO Filter dump by database/table, currently all tables are replicated // TODO SSL connection between master and slave -// TODO DB credentials littered all over the place in file system +// TODO Promote slave to master public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster { private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized"); private static final String MASTER_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_master.conf"; private static final String SLAVE_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_slave.conf"; - private static final int MASTER_SERVER_ID = 1; - private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID); + protected static final int MASTER_SERVER_ID = 1; @SuppressWarnings("serial") private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {}, "mysql.slave.next_server_id", "Returns the ID of the next slave server"); @SuppressWarnings("serial") - private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {}, + protected static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {}, "mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time."); @Override @@ -111,6 +106,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster subscriptions().subscribe(this, MEMBER_REMOVED, new MemberRemovedListener()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected void initEnrichers() { super.initEnrichers(); @@ -124,8 +120,8 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster enrichers().add(Enrichers.builder() .aggregating(MySqlNode.DATASTORE_URL) .publishing(SLAVE_DATASTORE_URL_LIST) - .computing(Functions.<Collection<String>>identity()) - .entityFilter(Predicates.not(IS_MASTER)) + .computing((Function)Functions.identity()) + .entityFilter(Predicates.not(MySqlClusterUtils.IS_MASTER)) .fromMembers() .build()); @@ -145,7 +141,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster .computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty()) .apply(CollectionFunctionals.firstElement()) .defaultValue(null)) - .entityFilter(IS_MASTER) + .entityFilter(MySqlClusterUtils.IS_MASTER) .build()); } @@ -158,13 +154,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster final EntitySpec<?> memberSpec = super.getMemberSpec(); if (memberSpec != null) { - if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) { - return EntitySpec.create(memberSpec) - .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID) - .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL); - } else { - return memberSpec; - } + return applyDefaults(memberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL); } return EntitySpec.create(MySqlNode.class) @@ -184,6 +174,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster return EntitySpec.create(MySqlNode.class) .displayName("MySql Slave") + // Slave server IDs will not be linear because getMemberSpec not always results in createNode (result discarded) .configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get()) .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL); } @@ -211,9 +202,10 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster @Override protected Entity createNode(Location loc, Map<?, ?> flags) { - Entity node = super.createNode(loc, flags); - if (!IS_MASTER.apply(node)) { - ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY, "Replication not started"); + MySqlNode node = (MySqlNode) super.createNode(loc, flags); + if (!MySqlClusterUtils.IS_MASTER.apply(node)) { + EntityLocal localNode = (EntityLocal) node; + ServiceNotUpLogic.updateNotUpIndicator(localNode, MySqlSlave.SLAVE_HEALTHY, "Replication not started"); addFeed(FunctionFeed.builder() .entity((EntityLocal)node) @@ -221,7 +213,7 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster .poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY) .callable(new SlaveStateCallable(node)) .checkSuccess(StringPredicates.isNonBlank()) - .onSuccess(new SlaveStateParser(node)) + .onSuccess(new SlaveStateParser(localNode)) .setOnFailure(false) .description("Polls SHOW SLAVE STATUS")) .build()); @@ -235,15 +227,15 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster } public static class SlaveStateCallable implements Callable<String> { - private Entity slave; - public SlaveStateCallable(Entity slave) { + private MySqlNode slave; + public SlaveStateCallable(MySqlNode slave) { this.slave = slave; } @Override public String call() throws Exception { if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) { - return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", "SHOW SLAVE STATUS \\G")).asTask().getUnchecked(); + return MySqlClusterUtils.executeSqlOnNode(slave, "SHOW SLAVE STATUS \\G"); } else { return null; } @@ -252,9 +244,9 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster } public static class SlaveStateParser implements Function<String, Boolean> { - private Entity slave; + private EntityLocal slave; - public SlaveStateParser(Entity slave) { + public SlaveStateParser(EntityLocal slave) { this.slave = slave; } @@ -281,38 +273,67 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster // ============= Member Init ============= - // The task is executed in inessential context (event handler) so - // not visible in tasks UI. Better make it visible so the user can - // see failures, currently accessible only from logs. - private static final class InitReplicationTask implements Runnable { - private final MySqlCluster cluster; - private final MySqlNode node; + // The task is executed separately from the start effector, so failing here + // will not fail the start effector as well, but it will eventually time out + // because replication is not started. + // Would be nice to be able to plug in to the entity lifecycle! - private InitReplicationTask(MySqlCluster cluster, MySqlNode node) { + private static final class NodeRunningListener implements SensorEventListener<Boolean> { + private MySqlCluster cluster; + private Semaphore lock = new Semaphore(1); + + public NodeRunningListener(MySqlCluster cluster) { this.cluster = cluster; - this.node = node; } @Override - public void run() { - Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID); - if (serverId == MASTER_SERVER_ID) { - initMaster(node); - } else if (serverId > MASTER_SERVER_ID) { - initSlave(node); + public void onEvent(SensorEvent<Boolean> event) { + final MySqlNode node = (MySqlNode) event.getSource(); + if (Boolean.TRUE.equals(event.getValue()) && + // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet. + // Probably will get several updates while replication is initialized so an additional + // check is needed whether we have already seen this. + Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) && + !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) { + + // Events executed sequentially so no need to synchronize here. + ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE); + + final Runnable nodeInitTaskBody; + if (MySqlClusterUtils.IS_MASTER.apply(node)) { + nodeInitTaskBody = new InitMasterTaskBody(cluster, node); + } else { + nodeInitTaskBody = new InitSlaveTaskBody(cluster, node, lock); + } + + DynamicTasks.submitTopLevelTask(TaskBuilder.builder() + .displayName("setup master-slave replication") + .body(nodeInitTaskBody) + .tag(BrooklynTaskTags.tagForContextEntity(node)) + .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) + .build(), + node); } } - private void initMaster(MySqlNode master) { - String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;"); + } + + private static class InitMasterTaskBody implements Runnable { + private MySqlNode master; + private MySqlCluster cluster; + public InitMasterTaskBody(MySqlCluster cluster, MySqlNode master) { + this.cluster = cluster; + this.master = master; + } + + @Override + public void run() { + String binLogInfo = MySqlClusterUtils.executeSqlOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;"); Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo); String file = status.get("File"); - if (file != null) { - ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_FILE, file); - } String position = status.get("Position"); - if (position != null) { - ((EntityInternal)master).sensors().set(MySqlMaster.MASTER_LOG_POSITION, new Integer(position)); + if (file != null && position != null) { + cluster.sensors().set(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT, new ReplicationSnapshot(null, null, file, Integer.parseInt(position))); } //NOTE: Will be executed on each start, analogously to the standard CREATION_SCRIPT config @@ -331,71 +352,6 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster return contents; return null; } - - private void initSlave(MySqlNode slave) { - MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); - String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE)); - Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION); - String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS)); - Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT); - String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); - String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); - String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD)); - - executeScriptOnNode(master, String.format( - "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" + - "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n", - username, slaveAddress, password, username, slaveAddress)); - - String slaveCmd = String.format( - "CHANGE MASTER TO " + - "MASTER_HOST='%s', " + - "MASTER_PORT=%d, " + - "MASTER_USER='%s', " + - "MASTER_PASSWORD='%s', " + - "MASTER_LOG_FILE='%s', " + - "MASTER_LOG_POS=%d;\n" + - "START SLAVE;\n", - masterAddress, masterPort, username, password, masterLogFile, masterLogPos); - executeScriptOnNode(slave, slaveCmd); - - cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); - } - - private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) { - return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked(); - } - - } - - private static final class NodeRunningListener implements SensorEventListener<Boolean> { - private MySqlCluster cluster; - - public NodeRunningListener(MySqlCluster cluster) { - this.cluster = cluster; - } - - @Override - public void onEvent(SensorEvent<Boolean> event) { - final MySqlNode node = (MySqlNode) event.getSource(); - if (Boolean.TRUE.equals(event.getValue()) && - // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet. - // Probably will get several updates while replication is initialized so an additional - // check is needed whether we have already seen this. - Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) && - !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) { - - // Events executed sequentially so no need to synchronize here. - node.sensors().set(NODE_REPLICATION_INITIALIZED, Boolean.TRUE); - - DynamicTasks.queueIfPossible(TaskBuilder.builder() - .displayName("Configure master-slave replication on node") - .body(new InitReplicationTask(cluster, node)) - .build()) - .orSubmitAsync(node); - } - } - } // ============= Member Remove ============= @@ -407,46 +363,12 @@ public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster Entity node = event.getValue(); String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId()); if (slaveAddress != null) { - DynamicTasks.queueIfPossible(TaskBuilder.builder() - .displayName("Remove slave access") - .body(new RemoveSlaveConfigTask(cluster, slaveAddress)) - .build()) - .orSubmitAsync(cluster); + // Could already be gone if stopping the entire app - let it throw an exception + MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), MySqlClusterUtils.IS_MASTER); + String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); + MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress)); } } } - public class RemoveSlaveConfigTask implements Runnable { - private MySqlCluster cluster; - private String slaveAddress; - - public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) { - this.cluster = cluster; - this.slaveAddress = validateSqlParam(slaveAddress); - } - - @Override - public void run() { - // Could already be gone if stopping the entire app - let it throw an exception - MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); - String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); - executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress)); - } - - } - - // Can't call node.executeScript directly, need to change execution context, so use an effector task - private static String executeScriptOnNode(MySqlNode node, String commands) { - return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(MySqlNode.EXECUTE_SCRIPT_COMMANDS, commands)).getUnchecked(); - } - - private static String validateSqlParam(String config) { - // Don't go into escape madness, just deny any suspicious strings. - // Would be nice to use prepared statements, but not worth pulling in the extra dependencies. - if (config.contains("'") && config.contains("\\")) { - throw new IllegalStateException("User provided string contains illegal SQL characters: " + config); - } - return config; - } - } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java new file mode 100644 index 0000000..9f8dc6d --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.EntityPredicates; +import org.apache.brooklyn.entity.database.DatastoreMixins.CanExecuteScript; +import org.apache.brooklyn.util.core.task.DynamicTasks; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; + +public class MySqlClusterUtils { + protected static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MySqlClusterImpl.MASTER_SERVER_ID); + + protected static String executeSqlOnNode(MySqlNode node, String commands) { + return executeSqlOnNodeAsync(node, commands).getUnchecked(); + } + + // Can't call node.executeScript directly, need to change execution context, so use an effector task + protected static Task<String> executeSqlOnNodeAsync(MySqlNode node, String commands) { + return DynamicTasks.queue(Effectors.invocation(node, MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(CanExecuteScript.COMMANDS.getName(), commands))).asTask(); + } + + protected static String validateSqlParam(String config) { + // Don't go into escape madness, just deny any suspicious strings. + // Would be nice to use prepared statements, but not worth pulling in the extra dependencies. + if (config.contains("'") && config.contains("\\")) { + throw new IllegalStateException("User provided string contains illegal SQL characters: " + config); + } + return config; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java index 461369b..b4da5f9 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java @@ -25,6 +25,9 @@ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; * The {@link SoftwareProcessDriver} for MySQL. */ public interface MySqlDriver extends SoftwareProcessDriver { - public String getStatusCmd(); - public ProcessTaskWrapper<Integer> executeScriptAsync(String commands); + String getStatusCmd(); + ProcessTaskWrapper<Integer> executeScriptAsync(String commands); + ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer); + ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination); + void changePassword(String oldPass, String newPass); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java index 484606e..7f9e508 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java @@ -19,21 +19,21 @@ package org.apache.brooklyn.entity.database.mysql; import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.objs.HasShortName; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.annotation.Effector; import org.apache.brooklyn.core.annotation.EffectorParam; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.MapConfigKey; -import org.apache.brooklyn.core.effector.MethodEffector; +import org.apache.brooklyn.core.effector.Effectors; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.location.PortRanges; import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey; +import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey; import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; import org.apache.brooklyn.entity.database.DatastoreMixins.DatastoreCommon; import org.apache.brooklyn.entity.software.base.SoftwareProcess; import org.apache.brooklyn.util.core.flags.SetFromFlag; @@ -86,10 +86,39 @@ public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommo AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql"); - MethodEffector<String> EXECUTE_SCRIPT = new MethodEffector<String>(MySqlNode.class, "executeScript"); - String EXECUTE_SCRIPT_COMMANDS = "commands"; + interface ExportDumpEffector { + ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Where to export the dump to. Resolved against runtime directory if relative.", "dump.sql"); + ConfigKey<String> ADDITIONAL_OPTIONS = ConfigKeys.newStringConfigKey("additionalOptions", "Additional command line options to pass to mysqldump"); + + Effector<Void> EXPORT_DUMP = Effectors.effector(Void.class, "export_dump") + .description("Invokes mysqldump against the node") + .parameter(PATH) + .parameter(ADDITIONAL_OPTIONS) + .buildAbstract(); + } + Effector<Void> EXPORT_DUMP = ExportDumpEffector.EXPORT_DUMP; + + interface ImportDumpEffector { + ConfigKey<String> PATH = ConfigKeys.newStringConfigKey("path", "Path to a file with SQL statements to import as the root user"); + + Effector<Void> IMPORT_DUMP = Effectors.effector(Void.class, "import_dump") + .description("Runs the sql statements in the file as the root user") + .parameter(PATH) + .buildAbstract(); + } + Effector<Void> IMPORT_DUMP = ImportDumpEffector.IMPORT_DUMP; + + interface ChangePasswordEffector { + ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "New password to set"); + + Effector<Void> CHANGE_PASSWORD = Effectors.effector(Void.class, "change_password") + .description("Change the mysql root password") + .parameter(PASSWORD) + .buildAbstract(); + } + Effector<Void> CHANGE_PASSWORD = ChangePasswordEffector.CHANGE_PASSWORD; - @Effector(description = "Execute SQL script on the node as the root user") - String executeScript(@EffectorParam(name=EXECUTE_SCRIPT_COMMANDS) String commands); + @org.apache.brooklyn.core.annotation.Effector(description = "Execute SQL script on the node as the root user") + public String executeScript(@EffectorParam(name="commands") String commands); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java new file mode 100644 index 0000000..af68959 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeEffectors.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.EffectorTasks; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.entity.database.mysql.MySqlNode.ChangePasswordEffector; +import org.apache.brooklyn.entity.database.mysql.MySqlNode.ExportDumpEffector; +import org.apache.brooklyn.entity.database.mysql.MySqlNode.ImportDumpEffector; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class MySqlNodeEffectors { + public static class ExportDumpEffectoryBody extends EffectorBody<Void> implements ExportDumpEffector { + @Override + public Void call(ConfigBag parameters) { + String path = parameters.get(PATH); + String additionalOptions = Strings.nullToEmpty(parameters.get(ADDITIONAL_OPTIONS)); + //TODO additionalOptions, path are not sanitized and are coming from the user. + //Should we try to sanitize (potentially limiting the range of possible inputs), + //or just assume the user has full machine access anyway? + ((MySqlNodeImpl)entity()).getDriver().dumpDatabase(additionalOptions, path); + return null; + } + } + public static Effector<Void> EXPORT_DUMP = Effectors.effector(ExportDumpEffector.EXPORT_DUMP) + .impl(new ExportDumpEffectoryBody()) + .build(); + + public static class ImportDumpEffectorBody extends EffectorBody<Void> implements ImportDumpEffector { + @Override + public Void call(ConfigBag parameters) { + String path = Preconditions.checkNotNull(parameters.get(PATH), "path is required"); + // TODO sanitize path? + ((MySqlNodeImpl)entity()).getDriver().executeScriptFromInstalledFileAsync(path); + return null; + } + } + public static Effector<Void> IMPORT_DUMP = Effectors.effector(ImportDumpEffector.IMPORT_DUMP) + .impl(new ImportDumpEffectorBody()) + .build(); + + public static class ChangePasswordEffectorBody extends EffectorBody<Void> implements ChangePasswordEffector { + @Override + public Void call(ConfigBag parameters) { + String newPass = Preconditions.checkNotNull(parameters.get(PASSWORD), "password is required"); + String oldPass = entity().getAttribute(MySqlNode.PASSWORD); + entity().sensors().set(MySqlNode.PASSWORD, newPass); + MySqlDriver driver = ((MySqlNodeImpl)entity()).getDriver(); + driver.changePassword(oldPass, newPass); + SshMachineLocation machine = EffectorTasks.getSshMachine(entity()); + DynamicTasks.queue( + SshTasks.newSshExecTaskFactory(machine, + "cd "+entity().getAttribute(MySqlNode.RUN_DIR), + "sed -i'' -e 's@^\\(\\s*password\\s*=\\s*\\).*$@\\1" + newPass.replace("\\", "\\\\") + "@g' mymysql.cnf") + .requiringExitCodeZero() + .summary("Change root password")); + return null; + } + } + public static Effector<Void> CHANGE_PASSWORD = Effectors.effector(ChangePasswordEffector.CHANGE_PASSWORD) + .impl(new ChangePasswordEffectorBody()) + .build(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java index dd0aac7..f470390 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java @@ -27,8 +27,6 @@ import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; import org.apache.brooklyn.feed.ssh.SshFeed; import org.apache.brooklyn.feed.ssh.SshPollConfig; import org.apache.brooklyn.feed.ssh.SshPollValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; @@ -36,6 +34,8 @@ import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; @@ -79,6 +79,9 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode { return executeScript((String)parameters.getStringKey("commands")); } }); + getMutableEntityType().addEffector(MySqlNodeEffectors.EXPORT_DUMP); + getMutableEntityType().addEffector(MySqlNodeEffectors.IMPORT_DUMP); + getMutableEntityType().addEffector(MySqlNodeEffectors.CHANGE_PASSWORD); } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java index 30d9abd..01ee983 100644 --- a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java @@ -33,18 +33,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.entity.database.DatastoreMixins; -import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; import org.apache.brooklyn.api.location.OsDetails; +import org.apache.brooklyn.core.effector.EffectorTasks; import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.location.BasicOsDetails.OsVersions; +import org.apache.brooklyn.entity.database.DatastoreMixins; +import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.io.FileUtil; @@ -57,6 +57,8 @@ import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; @@ -165,11 +167,7 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements boolean hasCreationScript = copyDatabaseCreationScript(); timer.waitForExpiryUnchecked(); - DynamicTasks.queue( - SshEffectorTasks.ssh( - "cd "+getRunDir(), - getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password= password "+getPassword() - ).summary("setting password")); + changePassword("", getPassword()); if (hasCreationScript) executeScriptFromInstalledFileAsync("creation-script.sql").asTask().getUnchecked(); @@ -179,6 +177,16 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements stop(); } + @Override + public void changePassword(String oldPass, String newPass) { + DynamicTasks.queue( + SshEffectorTasks.ssh( + "cd "+getRunDir(), + getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password=" + oldPass + " password "+newPass) + .summary("setting password") + .requiringExitCodeZero()); + } + protected void copyDatabaseConfigScript() { newScript(CUSTOMIZING).execute(); //create the directory @@ -264,13 +272,26 @@ public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements return executeScriptFromInstalledFileAsync(filename); } + @Override public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer) { + SshMachineLocation machine = EffectorTasks.getSshMachine(entity); return DynamicTasks.queue( - SshEffectorTasks.ssh( + SshTasks.newSshExecTaskFactory(machine, "cd "+getRunDir(), getBaseDir()+"/bin/mysql --defaults-file="+getConfigFile()+" < "+filenameAlreadyInstalledAtServer) .requiringExitCodeZero() .summary("executing datastore script "+filenameAlreadyInstalledAtServer)); } + @Override + public ProcessTaskWrapper<Integer> dumpDatabase(String additionalOptions, String dumpDestination) { + SshMachineLocation machine = EffectorTasks.getSshMachine(entity); + return DynamicTasks.queue( + SshTasks.newSshExecTaskFactory(machine, + "cd "+getRunDir(), + getBaseDir()+"/bin/mysqldump --defaults-file="+getConfigFile()+" "+additionalOptions+" > "+dumpDestination) + .requiringExitCodeZero() + .summary("Dumping database to " + dumpDestination)); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java new file mode 100644 index 0000000..48af15a --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/ReplicationSnapshot.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +public class ReplicationSnapshot { + private String entityId; + private String snapshotPath; + private String binLogName; + private int binLogPosition; + + public ReplicationSnapshot(String entityId, String snapshotPath, String binLogName, int binLogPosition) { + this.entityId = entityId; + this.snapshotPath = snapshotPath; + this.binLogName = binLogName; + this.binLogPosition = binLogPosition; + } + + public String getEntityId() { + return entityId; + } + public void setEntityId(String entityId) { + this.entityId = entityId; + } + public String getSnapshotPath() { + return snapshotPath; + } + public void setSnapshotPath(String snapshotPath) { + this.snapshotPath = snapshotPath; + } + public String getBinLogName() { + return binLogName; + } + public void setBinLogName(String binLogName) { + this.binLogName = binLogName; + } + public int getBinLogPosition() { + return binLogPosition; + } + public void setBinLogPosition(int binLogPosition) { + this.binLogPosition = binLogPosition; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java ---------------------------------------------------------------------- diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java index 6b5a6cb..e1874b9 100644 --- a/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java +++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/VogellaExampleAccess.java @@ -123,11 +123,11 @@ public class VogellaExampleAccess { private void writeMetaData(ResultSet resultSet) throws SQLException { // Get some metadata from the database - log.info("The columns in the table are: "); + log.debug("The columns in the table are: "); - log.info("Table: " + resultSet.getMetaData().getTableName(1)); + log.debug("Table: " + resultSet.getMetaData().getTableName(1)); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - log.info("Column " + i + " " + resultSet.getMetaData().getColumnName(i)); + log.debug("Column " + i + " " + resultSet.getMetaData().getColumnName(i)); } } @@ -138,11 +138,11 @@ public class VogellaExampleAccess { String date = row.get(2); String summary = row.get(3); String comment = row.get(4); - log.info("User: " + user); - log.info("Website: " + website); - log.info("Summary: " + summary); - log.info("Date: " + date); - log.info("Comment: " + comment); + log.debug("User: " + user); + log.debug("Website: " + website); + log.debug("Summary: " + summary); + log.debug("Date: " + date); + log.debug("Comment: " + comment); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0de0a2a9/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java index c5e12d5..c250843 100644 --- a/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java +++ b/software/database/src/test/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterIntegrationTest.java @@ -18,28 +18,43 @@ */ package org.apache.brooklyn.entity.database.mysql; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; + +import java.util.Map; + import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.core.effector.EffectorTasks; +import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport; +import org.apache.brooklyn.entity.database.mysql.MySqlCluster.MySqlMaster; +import org.apache.brooklyn.entity.software.base.SoftwareProcess; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.ssh.SshTasks; import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.ssh.BashCommands; import org.testng.annotations.Test; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport { - @Test(groups = {"Integration"}) + private static final String TEST_LOCATION = "localhost"; + + @Test(groups="Integration") public void testAllNodesInit() throws Exception { try { MySqlClusterTestHelper.test(app, getLocation()); } finally { - for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) { - String runDir = member.getAttribute(MySqlNode.RUN_DIR); - if (runDir != null) { - Os.deleteRecursively(runDir); - } - } + cleanData(); } } @@ -48,16 +63,115 @@ public class MySqlClusterIntegrationTest extends BrooklynAppLiveTestSupport { try { MySqlClusterTestHelper.testMasterInit(app, getLocation()); } finally { - for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) { - String runDir = member.getAttribute(MySqlNode.RUN_DIR); - if (runDir != null) { - Os.deleteRecursively(runDir); + cleanData(); + } + } + + @Test(groups="Integration") + public void testDumpReplication() throws Exception { + try { + Location loc = getLocation(); + EntitySpec<MySqlCluster> clusterSpec = EntitySpec.create(MySqlCluster.class) + .configure(MySqlMaster.MASTER_CREATION_SCRIPT_CONTENTS, MySqlClusterTestHelper.CREATION_SCRIPT) + .configure(MySqlNode.MYSQL_SERVER_CONF, MutableMap.<String, Object>of("skip-name-resolve","")); + MySqlCluster cluster = MySqlClusterTestHelper.initCluster(app, loc, clusterSpec); + MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST); + purgeLogs(cluster, master); + + // test dump replication from master + MySqlNode slave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked()); + assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), master.getId()); + MySqlClusterTestHelper.assertReplication(master, slave); + + // test dump replication from slave, missing dump on node + deleteSnapshot(cluster); + cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, slave.getId()); + MySqlNode secondSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked()); + assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), slave.getId()); + MySqlClusterTestHelper.assertReplication(master, secondSlave); + + // test dump replication from slave, missing snapshot entity + Entities.destroy(slave); + cluster.config().set(MySqlCluster.REPLICATION_PREFERRED_SOURCE, secondSlave.getId()); + MySqlNode thirdSlave = (MySqlNode) Iterables.getOnlyElement(cluster.invoke(MySqlCluster.RESIZE_BY_DELTA, ImmutableMap.of("delta", 1)).getUnchecked()); + assertEquals(cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT).getEntityId(), secondSlave.getId()); + MySqlClusterTestHelper.assertReplication(master, thirdSlave); + } finally { + cleanData(); + } + } + + private void deleteSnapshot(MySqlCluster cluster) { + ReplicationSnapshot replicationSnapshot = cluster.getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT); + Entity snapshotEntity = mgmt.getEntityManager().getEntity(replicationSnapshot.getEntityId()); + SshMachineLocation machine = EffectorTasks.getSshMachine(snapshotEntity); + Entities.submit(snapshotEntity, SshEffectorTasks.ssh( + "cd $RUN_DIR", + "rm " + replicationSnapshot.getSnapshotPath()) + .summary("clear snapshot") + .machine(machine) + .environmentVariable("RUN_DIR", snapshotEntity.getAttribute(MySqlNode.RUN_DIR)) + .requiringExitCodeZero()) + .asTask() + .getUnchecked(); + } + + private void purgeLogs(MySqlCluster cluster, MySqlNode master) { + String preFlushBinaryLogFile = getBinaryLogFile(master); + ReplicationSnapshot replicationSnapshot = master.getParent().getAttribute(MySqlCluster.REPLICATION_LAST_SLAVE_SNAPSHOT); + assertEquals(preFlushBinaryLogFile, replicationSnapshot.getBinLogName()); + MySqlClusterTestHelper.execSql(master, "FLUSH LOGS"); + String postFlushBinaryLogFile = getBinaryLogFile(master); + waitSlavesCatchUp(cluster, postFlushBinaryLogFile); + assertNotEquals(postFlushBinaryLogFile, preFlushBinaryLogFile); + MySqlClusterTestHelper.execSql(master, "PURGE BINARY LOGS TO '" + postFlushBinaryLogFile + "';"); + assertFalse(fileExists(master, preFlushBinaryLogFile)); + } + + private void waitSlavesCatchUp(final MySqlCluster cluster, final String binLog) { + Asserts.succeedsEventually(new Runnable() { + @Override + public void run() { + MySqlNode master = (MySqlNode) cluster.getAttribute(MySqlCluster.FIRST); + for (Entity node : cluster.getMembers()) { + if (node == master) continue; + String status = MySqlClusterTestHelper.execSql((MySqlNode) node, "SHOW SLAVE STATUS \\G"); + Map<String, String> map = MySqlRowParser.parseSingle(status); + assertEquals(map.get("Relay_Master_Log_File"), binLog); } } + }); + } + private String getBinaryLogFile(MySqlNode master) { + String status = MySqlClusterTestHelper.execSql(master, "SHOW MASTER STATUS \\G"); + Map<String, String> map = MySqlRowParser.parseSingle(status); + return map.get("File"); + } + private boolean fileExists(MySqlNode node, String binLogName) { + String dataDir = Strings.nullToEmpty(node.getConfig(MySqlNode.DATA_DIR)); + String path = Os.mergePathsUnix(dataDir, binLogName); + String cmd = BashCommands.chain( + "cd $RUN_DIR", + BashCommands.requireTest(String.format("-f \"%s\"", path), "File " + path + " doesn't exist.")); + String summary = "Check if file " + path + " exists"; + SshMachineLocation machine = EffectorTasks.getSshMachine(node); + return Entities.submit(node, SshTasks.newSshExecTaskFactory(machine, cmd) + .allowingNonZeroExitCode() + .environmentVariable("RUN_DIR", node.getAttribute(SoftwareProcess.RUN_DIR)) + .summary(summary) + .allowingNonZeroExitCode()).asTask().getUnchecked() == 0; + } + private void cleanData() { + if (app.getChildren().isEmpty()) return; + for (Entity member : Iterables.getOnlyElement(app.getChildren()).getChildren()) { + String runDir = member.getAttribute(MySqlNode.RUN_DIR); + if (runDir != null) { + Os.deleteRecursively(runDir); + } } } private Location getLocation() { - return mgmt.getLocationRegistry().resolve("localhost"); + return mgmt.getLocationRegistry().resolve(TEST_LOCATION); } }
