http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java new file mode 100644 index 0000000..cdaf433 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.osgi; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import org.apache.brooklyn.core.util.osgi.Osgis; +import org.apache.brooklyn.core.util.osgi.Osgis.VersionedName; +import org.osgi.framework.Version; +import org.testng.annotations.Test; + +public class OsgisTest { + + @Test + public void testParseOsgiIdentifier() throws Exception { + assertEquals(Osgis.parseOsgiIdentifier("a.b").get(), new VersionedName("a.b", null)); + assertEquals(Osgis.parseOsgiIdentifier("a.b:0.1.2").get(), new VersionedName("a.b", Version.parseVersion("0.1.2"))); + assertEquals(Osgis.parseOsgiIdentifier("a.b:0.0.0.SNAPSHOT").get(), new VersionedName("a.b", Version.parseVersion("0.0.0.SNAPSHOT"))); + assertFalse(Osgis.parseOsgiIdentifier("a.b:0.notanumber.2").isPresent()); // invalid version + assertFalse(Osgis.parseOsgiIdentifier("a.b:0.1.2:3.4.5").isPresent()); // too many colons + assertFalse(Osgis.parseOsgiIdentifier("a.b:0.0.0_SNAPSHOT").isPresent()); // invalid version + assertFalse(Osgis.parseOsgiIdentifier("").isPresent()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java new file mode 100644 index 0000000..77f91f6 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.ssh; + +import static brooklyn.util.ssh.BashCommands.sudo; +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.core.util.task.BasicExecutionContext; +import org.apache.brooklyn.core.util.task.ssh.SshTasks; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; + +import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.net.Networking; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; + +public class BashCommandsIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(BashCommandsIntegrationTest.class); + + private ManagementContext mgmt; + private BasicExecutionContext exec; + + private File destFile; + private File sourceNonExistantFile; + private File sourceFile1; + private File sourceFile2; + private String sourceNonExistantFileUrl; + private String sourceFileUrl1; + private String sourceFileUrl2; + private SshMachineLocation loc; + + private String localRepoFilename = "localrepofile.txt"; + private File localRepoBasePath; + private File localRepoEntityBasePath; + private String localRepoEntityVersionPath; + private File localRepoEntityFile; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + mgmt = new LocalManagementContextForTests(); + exec = new BasicExecutionContext(mgmt.getExecutionManager()); + + destFile = Os.newTempFile(getClass(), "commoncommands-test-dest.txt"); + + sourceNonExistantFile = new File("/this/does/not/exist/ERQBETJJIG1234"); + sourceNonExistantFileUrl = sourceNonExistantFile.toURI().toString(); + + sourceFile1 = Os.newTempFile(getClass(), "commoncommands-test.txt"); + sourceFileUrl1 = sourceFile1.toURI().toString(); + Files.write("mysource1".getBytes(), sourceFile1); + + sourceFile2 = Os.newTempFile(getClass(), "commoncommands-test2.txt"); + sourceFileUrl2 = sourceFile2.toURI().toString(); + Files.write("mysource2".getBytes(), sourceFile2); + + localRepoEntityVersionPath = JavaClassNames.simpleClassName(this)+"-test-dest-"+Identifiers.makeRandomId(8); + localRepoBasePath = new File(format("%s/.brooklyn/repository", System.getProperty("user.home"))); + localRepoEntityBasePath = new File(localRepoBasePath, localRepoEntityVersionPath); + localRepoEntityFile = new File(localRepoEntityBasePath, localRepoFilename); + localRepoEntityBasePath.mkdirs(); + Files.write("mylocal1".getBytes(), localRepoEntityFile); + + loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (sourceFile1 != null) sourceFile1.delete(); + if (sourceFile2 != null) sourceFile2.delete(); + if (destFile != null) destFile.delete(); + if (localRepoEntityFile != null) localRepoEntityFile.delete(); + if (localRepoEntityBasePath != null) FileUtils.deleteDirectory(localRepoEntityBasePath); + if (loc != null) loc.close(); + if (mgmt != null) Entities.destroyAll(mgmt); + } + + @Test(groups="Integration") + public void testSudo() throws Exception { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + ByteArrayOutputStream errStream = new ByteArrayOutputStream(); + String cmd = sudo("whoami"); + int exitcode = loc.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "test", ImmutableList.of(cmd)); + String outstr = new String(outStream.toByteArray()); + String errstr = new String(errStream.toByteArray()); + + assertEquals(exitcode, 0, "out="+outstr+"; err="+errstr); + assertTrue(outstr.contains("root"), "out="+outstr+"; err="+errstr); + } + + public void testDownloadUrl() throws Exception { + List<String> cmds = BashCommands.commandsToDownloadUrlsAs( + ImmutableList.of(sourceFileUrl1), + destFile.getAbsolutePath()); + int exitcode = loc.execCommands("test", cmds); + + assertEquals(0, exitcode); + assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1")); + } + + @Test(groups="Integration") + public void testDownloadFirstSuccessfulFile() throws Exception { + List<String> cmds = BashCommands.commandsToDownloadUrlsAs( + ImmutableList.of(sourceNonExistantFileUrl, sourceFileUrl1, sourceFileUrl2), + destFile.getAbsolutePath()); + int exitcode = loc.execCommands("test", cmds); + + assertEquals(0, exitcode); + assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1")); + } + + @Test(groups="Integration") + public void testDownloadToStdout() throws Exception { + ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc, + "cd "+destFile.getParentFile().getAbsolutePath(), + BashCommands.downloadToStdout(Arrays.asList(sourceFileUrl1))+" | sed s/my/your/") + .requiringZeroAndReturningStdout().newTask(); + + String result = exec.submit(t).get(); + assertTrue(result.trim().equals("yoursource1"), "Wrong contents of stdout download: "+result); + } + + @Test(groups="Integration") + public void testAlternativesWhereFirstSucceeds() throws Exception { + ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.alternatives(Arrays.asList("echo first", "exit 88"))) + .newTask(); + + Integer returnCode = exec.submit(t).get(); + String stdout = t.getStdout(); + String stderr = t.getStderr(); + log.info("alternatives for good first command gave: "+returnCode+"; err="+stderr+"; out="+stdout); + assertTrue(stdout.contains("first"), "errcode="+returnCode+"; stdout="+stdout+"; stderr="+stderr); + assertEquals(returnCode, (Integer)0); + } + + @Test(groups="Integration") + public void testAlternatives() throws Exception { + ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.alternatives(Arrays.asList("asdfj_no_such_command_1", "exit 88"))) + .newTask(); + + Integer returnCode = exec.submit(t).get(); + log.info("alternatives for bad commands gave: "+returnCode+"; err="+new String(t.getStderr())+"; out="+new String(t.getStdout())); + assertEquals(returnCode, (Integer)88); + } + + @Test(groups="Integration") + public void testRequireTestHandlesFailure() throws Exception { + ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(), + "The requested file does not exist")).newTask(); + + exec.submit(t).get(); + assertNotEquals(t.getExitCode(), (Integer)0); + assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr()); + assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout()); + } + + @Test(groups="Integration") + public void testRequireTestHandlesSuccess() throws Exception { + ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.requireTest("-f "+sourceFile1.getPath(), + "The requested file does not exist")).newTask(); + + exec.submit(t).get(); + assertEquals(t.getExitCode(), (Integer)0); + assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr()); + } + + @Test(groups="Integration") + public void testRequireFileHandlesFailure() throws Exception { + ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.requireFile(sourceNonExistantFile.getPath())).newTask(); + + exec.submit(t).get(); + assertNotEquals(t.getExitCode(), (Integer)0); + assertTrue(t.getStderr().contains("required file"), "Expected message in: "+t.getStderr()); + assertTrue(t.getStderr().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStderr()); + assertTrue(t.getStdout().contains("required file"), "Expected message in: "+t.getStdout()); + assertTrue(t.getStdout().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStdout()); + } + + @Test(groups="Integration") + public void testRequireFileHandlesSuccess() throws Exception { + ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.requireFile(sourceFile1.getPath())).newTask(); + + exec.submit(t).get(); + assertEquals(t.getExitCode(), (Integer)0); + assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr()); + } + + @Test(groups="Integration") + public void testRequireFailureExitsImmediately() throws Exception { + ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc) + .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(), + "The requested file does not exist")) + .add("echo shouldnae come here").newTask(); + + exec.submit(t).get(); + assertNotEquals(t.getExitCode(), (Integer)0); + assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr()); + assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout()); + Assert.assertFalse(t.getStdout().contains("shouldnae"), "Expected message in: "+t.getStdout()); + } + + @Test(groups="Integration") + public void testPipeMultiline() throws Exception { + String output = execRequiringZeroAndReturningStdout(loc, + BashCommands.pipeTextTo("hello world\n"+"and goodbye\n", "wc")).get(); + + assertEquals(Strings.replaceAllRegex(output, "\\s+", " ").trim(), "3 4 25"); + } + + @Test(groups="Integration") + public void testWaitForFileContentsWhenAbortingOnFail() throws Exception { + String fileContent = "mycontents"; + String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, true); + + int exitcode = loc.execCommands("test", ImmutableList.of(cmd)); + assertEquals(exitcode, 1); + + Files.write(fileContent, destFile, Charsets.UTF_8); + int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd)); + assertEquals(exitcode2, 0); + } + + @Test(groups="Integration") + public void testWaitForFileContentsWhenNotAbortingOnFail() throws Exception { + String fileContent = "mycontents"; + String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, false); + + String output = execRequiringZeroAndReturningStdout(loc, cmd).get(); + assertTrue(output.contains("Couldn't find"), "output="+output); + + Files.write(fileContent, destFile, Charsets.UTF_8); + String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get(); + assertFalse(output2.contains("Couldn't find"), "output="+output2); + } + + @Test(groups="Integration") + public void testWaitForFileContentsWhenContentsAppearAfterStart() throws Exception { + String fileContent = "mycontents"; + + String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.THIRTY_SECONDS, false); + ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd); + exec.submit(t); + + // sleep for long enough to ensure the ssh command is definitely executing + Thread.sleep(5*1000); + assertFalse(t.isDone()); + + Files.write(fileContent, destFile, Charsets.UTF_8); + String output = t.get(); + assertFalse(output.contains("Couldn't find"), "output="+output); + } + + @Test(groups="Integration", dependsOnMethods="testSudo") + public void testWaitForPortFreeWhenAbortingOnTimeout() throws Exception { + ServerSocket serverSocket = openServerSocket(); + try { + int port = serverSocket.getLocalPort(); + String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, true); + + int exitcode = loc.execCommands("test", ImmutableList.of(cmd)); + assertEquals(exitcode, 1); + + serverSocket.close(); + assertTrue(Networking.isPortAvailable(port)); + int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd)); + assertEquals(exitcode2, 0); + } finally { + serverSocket.close(); + } + } + + @Test(groups="Integration", dependsOnMethods="testSudo") + public void testWaitForPortFreeWhenNotAbortingOnTimeout() throws Exception { + ServerSocket serverSocket = openServerSocket(); + try { + int port = serverSocket.getLocalPort(); + String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, false); + + String output = execRequiringZeroAndReturningStdout(loc, cmd).get(); + assertTrue(output.contains(port+" still in use"), "output="+output); + + serverSocket.close(); + assertTrue(Networking.isPortAvailable(port)); + String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get(); + assertFalse(output2.contains("still in use"), "output="+output2); + } finally { + serverSocket.close(); + } + } + + @Test(groups="Integration", dependsOnMethods="testSudo") + public void testWaitForPortFreeWhenFreedAfterStart() throws Exception { + ServerSocket serverSocket = openServerSocket(); + try { + int port = serverSocket.getLocalPort(); + + String cmd = BashCommands.waitForPortFree(port, Duration.THIRTY_SECONDS, false); + ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd); + exec.submit(t); + + // sleep for long enough to ensure the ssh command is definitely executing + Thread.sleep(5*1000); + assertFalse(t.isDone()); + + serverSocket.close(); + assertTrue(Networking.isPortAvailable(port)); + String output = t.get(); + assertFalse(output.contains("still in use"), "output="+output); + } finally { + serverSocket.close(); + } + } + + + // Disabled by default because of risk of overriding /etc/hosts in really bad way if doesn't work properly! + // As a manual visual inspection test, consider first manually creating /etc/hostname and /etc/sysconfig/network + // so that it looks like debian+ubuntu / CentOS/RHEL. + @Test(groups={"Integration"}, enabled=false) + public void testSetHostnameUnqualified() throws Exception { + runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, false); + } + + @Test(groups={"Integration"}, enabled=false) + public void testSetHostnameQualified() throws Exception { + runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase()+".brooklyn.incubator.apache.org", null, false); + } + + @Test(groups={"Integration"}, enabled=false) + public void testSetHostnameNullDomain() throws Exception { + runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, true); + } + + @Test(groups={"Integration"}, enabled=false) + public void testSetHostnameNonNullDomain() throws Exception { + runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), "brooklyn.incubator.apache.org", true); + } + + protected void runSetHostname(String newHostname, String newDomain, boolean includeDomain) throws Exception { + String fqdn = (includeDomain && Strings.isNonBlank(newDomain)) ? newHostname + "." + newDomain : newHostname; + + LocalManagementContextForTests mgmt = new LocalManagementContextForTests(); + SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain(); + + execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testSetHostname")).get(); + execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname", sudo("cp /etc/hostname /etc/hostname-orig-testSetHostname"))).get(); + execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network", sudo("cp /etc/sysconfig/network /etc/sysconfig/network-orig-testSetHostname"))).get(); + + String origHostname = getHostnameNoArgs(loc); + assertTrue(Strings.isNonBlank(origHostname)); + + try { + List<String> cmd = (includeDomain) ? BashCommands.setHostname(newHostname, newDomain) : BashCommands.setHostname(newHostname); + execRequiringZeroAndReturningStdout(loc, cmd).get(); + + String actualHostnameUnqualified = getHostnameUnqualified(loc); + String actualHostnameFullyQualified = getHostnameFullyQualified(loc); + + // TODO On OS X at least, we aren't actually setting the domain name; we're just letting + // the user pass in what the domain name is. We do add this properly to /etc/hosts + // (e.g. first line is "127.0.0.1 br-g4x5wgx8.brooklyn.incubator.apache.org br-g4x5wgx8 localhost") + // but subsequent calls to `hostname -f` returns the unqualified. Similarly, `domainname` + // returns blank. Therefore we can't assert that it equals our expected val (because we just made + // it up - "brooklyn.incubator.apache.org"). + // assertEquals(actualHostnameFullyQualified, fqdn); + assertEquals(actualHostnameUnqualified, Strings.getFragmentBetween(newHostname, null, ".")); + execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameUnqualified).get(); + execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameFullyQualified).get(); + + String result = execRequiringZeroAndReturningStdout(loc, "grep -n "+fqdn+" /etc/hosts").get(); + assertTrue(result.contains("localhost"), "line="+result); + log.info("result="+result); + + } finally { + execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testSetHostname /etc/hosts")).get(); + execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname-orig-testSetHostname", sudo("cp /etc/hostname-orig-testSetHostname /etc/hostname"))).get(); + execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network-orig-testSetHostname", sudo("cp /etc/sysconfig/network-orig-testSetHostname /etc/sysconfig/network"))).get(); + execRequiringZeroAndReturningStdout(loc, sudo("hostname "+origHostname)).get(); + } + } + + // Marked disabled because not safe to run on your normal machine! It modifies /etc/hosts, which is dangerous if things go wrong! + @Test(groups={"Integration"}, enabled=false) + public void testModifyEtcHosts() throws Exception { + LocalManagementContextForTests mgmt = new LocalManagementContextForTests(); + SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain(); + + execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testModifyEtcHosts")).get(); + int numLinesOrig = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]); + + try { + String cmd = BashCommands.prependToEtcHosts("1.2.3.4", "myhostnamefor1234.at.start", "myhostnamefor1234b"); + execRequiringZeroAndReturningStdout(loc, cmd).get(); + + String cmd2 = BashCommands.appendToEtcHosts("5.6.7.8", "myhostnamefor5678.at.end", "myhostnamefor5678"); + execRequiringZeroAndReturningStdout(loc, cmd2).get(); + + String grepFirst = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor1234 /etc/hosts").get(); + String grepLast = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor5678 /etc/hosts").get(); + int numLinesAfter = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]); + log.info("result: numLinesBefore="+numLinesOrig+"; numLinesAfter="+numLinesAfter+"; first="+grepFirst+"; last="+grepLast); + + assertTrue(grepFirst.startsWith("1:") && grepFirst.contains("1.2.3.4 myhostnamefor1234.at.start myhostnamefor1234"), "first="+grepFirst); + assertTrue(grepLast.startsWith((numLinesOrig+2)+":") && grepLast.contains("5.6.7.8 myhostnamefor5678.at.end myhostnamefor5678"), "last="+grepLast); + assertEquals(numLinesOrig + 2, numLinesAfter, "lines orig="+numLinesOrig+", after="+numLinesAfter); + } finally { + execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testModifyEtcHosts /etc/hosts")).get(); + } + } + + private String getHostnameNoArgs(SshMachineLocation machine) { + String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname; echo AFTMARKER").get(); + return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim(); + } + + private String getHostnameUnqualified(SshMachineLocation machine) { + String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname -s 2> /dev/null || hostname; echo AFTMARKER").get(); + return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim(); + } + + private String getHostnameFullyQualified(SshMachineLocation machine) { + String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname --fqdn 2> /dev/null || hostname -f; echo AFTMARKER").get(); + return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim(); + } + + private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, Collection<String> cmds) { + return execRequiringZeroAndReturningStdout(loc, cmds.toArray(new String[cmds.size()])); + } + + private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, String... cmds) { + ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc, cmds) + .requiringZeroAndReturningStdout().newTask(); + exec.submit(t); + return t; + } + + private ServerSocket openServerSocket() { + int lowerBound = 40000; + int upperBound = 40100; + for (int i = lowerBound; i < upperBound; i++) { + try { + return new ServerSocket(i); + } catch (IOException e) { + // try next number + } + } + throw new IllegalStateException("No ports available in range "+lowerBound+" to "+upperBound); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java new file mode 100644 index 0000000..71d2586 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.ScheduledTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Predicate; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Callables; + +/** + * Test the operation of the {@link BasicTask} class. + * + * TODO clarify test purpose + */ +public class BasicTaskExecutionPerformanceTest { + private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionPerformanceTest.class); + + private static final int TIMEOUT_MS = 10*1000; + + private BasicExecutionManager em; + + public static final int MAX_OVERHEAD_MS = 1500; // was 750ms but saw 1.3s on buildhive + public static final int EARLY_RETURN_GRACE = 25; // saw 13ms early return on jenkins! + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + em = new BasicExecutionManager("mycontext"); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (em != null) em.shutdownNow(); + } + + @SuppressWarnings("unchecked") + @Test + public void testScheduledTaskExecutedAfterDelay() throws Exception { + int delay = 100; + final CountDownLatch latch = new CountDownLatch(1); + + Callable<Task<?>> taskFactory = new Callable<Task<?>>() { + @Override public Task<?> call() { + return new BasicTask<Void>(new Runnable() { + @Override public void run() { + latch.countDown(); + }}); + }}; + ScheduledTask t = new ScheduledTask(taskFactory).delay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + em.submit(t); + + assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + long actualDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(actualDelay > (delay-EARLY_RETURN_GRACE), "actualDelay="+actualDelay+"; delay="+delay); + assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay); + } + + @SuppressWarnings("unchecked") + @Test + public void testScheduledTaskExecutedAtRegularPeriod() throws Exception { + final int period = 100; + final int numTimestamps = 4; + final CountDownLatch latch = new CountDownLatch(1); + final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList()); + final Stopwatch stopwatch = Stopwatch.createStarted(); + + Callable<Task<?>> taskFactory = new Callable<Task<?>>() { + @Override public Task<?> call() { + return new BasicTask<Void>(new Runnable() { + @Override public void run() { + timestamps.add(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (timestamps.size() >= numTimestamps) latch.countDown(); + }}); + }}; + ScheduledTask t = new ScheduledTask(taskFactory).delay(1).period(period); + em.submit(t); + + assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + synchronized (timestamps) { + long prev = timestamps.get(0); + for (long timestamp : timestamps.subList(1, timestamps.size())) { + assertTrue(timestamp > prev+period-EARLY_RETURN_GRACE, "timestamps="+timestamps); + assertTrue(timestamp < prev+period+MAX_OVERHEAD_MS, "timestamps="+timestamps); + prev = timestamp; + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testCanCancelScheduledTask() throws Exception { + final int period = 1; + final long checkPeriod = 250; + final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList()); + + Callable<Task<?>> taskFactory = new Callable<Task<?>>() { + @Override public Task<?> call() { + return new BasicTask<Void>(new Runnable() { + @Override public void run() { + timestamps.add(System.currentTimeMillis()); + }}); + }}; + ScheduledTask t = new ScheduledTask(taskFactory).period(period); + em.submit(t); + + t.cancel(); + long cancelTime = System.currentTimeMillis(); + int countImmediatelyAfterCancel = timestamps.size(); + Thread.sleep(checkPeriod); + int countWellAfterCancel = timestamps.size(); + + // should have at most 1 more execution after cancel + log.info("testCanCancelScheduledTask saw "+countImmediatelyAfterCancel+" then cancel then "+countWellAfterCancel+" total"); + assertTrue(countWellAfterCancel - countImmediatelyAfterCancel <= 2, "timestamps="+timestamps+"; cancelTime="+cancelTime); + } + + // Previously, when we used a CopyOnWriteArraySet, performance for submitting new tasks was + // terrible, and it degraded significantly as the number of previously executed tasks increased + // (e.g. 9s for first 1000; 26s for next 1000; 42s for next 1000). + @Test + public void testExecutionManagerPerformance() throws Exception { + // Was fixed at 1000 tasks, but was running out of virtual memory due to excessive thread creation + // on machines which were not able to execute the threads quickly. + final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000); + final int NUM_TIMES = 10; + final int MAX_ACCEPTABLE_TIME = 7500; // saw 5601ms on buildhive + + long tWarmup = execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A")); + + List<Long> times = Lists.newArrayList(); + for (int i = 1; i <= NUM_TIMES; i++) { + times.add(execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A"))); + } + + Long toobig = Iterables.find( + times, + new Predicate<Long>() { + public boolean apply(Long input) { + return input > MAX_ACCEPTABLE_TIME; + }}, + null); + assertNull(toobig, "warmup="+tWarmup+"; times="+times); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private long execTasksAndWaitForDone(int numTasks, List<?> tags) throws Exception { + List<Task<?>> tasks = Lists.newArrayList(); + long startTimestamp = System.currentTimeMillis(); + for (int i = 1; i < numTasks; i++) { + Task<?> t = new BasicTask(Callables.returning(null)); // no-op + em.submit(MutableMap.of("tags", tags), t); + tasks.add(t); + } + long submittedTimestamp = System.currentTimeMillis(); + + for (Task t : tasks) { + t.get(); + } + long endTimestamp = System.currentTimeMillis(); + long submitTime = submittedTimestamp - startTimestamp; + long totalTime = endTimestamp - startTimestamp; + + log.info("Executed {} tasks; {}ms total; {}ms to submit", new Object[] {numTasks, totalTime, submitTime}); + + return totalTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java new file mode 100644 index 0000000..c730738 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Callables; + +/** + * Test the operation of the {@link BasicTask} class. + * + * TODO clarify test purpose + */ +public class BasicTaskExecutionTest { + private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionTest.class); + + private static final int TIMEOUT_MS = 10*1000; + + private BasicExecutionManager em; + private Map<Object, Object> data; + + @BeforeMethod(alwaysRun=true) + public void setUp() { + em = new BasicExecutionManager("mycontext"); + data = Collections.synchronizedMap(new HashMap<Object, Object>()); + data.clear(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (em != null) em.shutdownNow(); + if (data != null) data.clear(); + } + + @Test + public void runSimpleBasicTask() throws Exception { + BasicTask<Object> t = new BasicTask<Object>(newPutCallable(1, "b")); + data.put(1, "a"); + Task<Object> t2 = em.submit(MutableMap.of("tag", "A"), t); + assertEquals("a", t.get()); + assertEquals("a", t2.get()); + assertEquals("b", data.get(1)); + } + + @Test + public void runSimpleRunnable() throws Exception { + data.put(1, "a"); + Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutRunnable(1, "b")); + assertEquals(null, t.get()); + assertEquals("b", data.get(1)); + } + + @Test + public void runSimpleCallable() throws Exception { + data.put(1, "a"); + Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutCallable(1, "b")); + assertEquals("a", t.get()); + assertEquals("b", data.get(1)); + } + + @Test + public void runBasicTaskWithWaits() throws Exception { + final CountDownLatch signalStarted = new CountDownLatch(1); + final CountDownLatch allowCompletion = new CountDownLatch(1); + final BasicTask<Object> t = new BasicTask<Object>(new Callable<Object>() { + public Object call() throws Exception { + Object result = data.put(1, "b"); + signalStarted.countDown(); + assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + return result; + }}); + data.put(1, "a"); + + Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t); + assertEquals(t, t2); + assertFalse(t.isDone()); + + assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals("b", data.get(1)); + assertFalse(t.isDone()); + + log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false)); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + String status = t.getStatusDetail(false); + assertTrue(status != null && status.toLowerCase().contains("waiting"), "status="+status); + }}); + + allowCompletion.countDown(); + assertEquals("a", t.get()); + } + + @Test + public void runMultipleBasicTasks() throws Exception { + data.put(1, 1); + BasicExecutionManager em = new BasicExecutionManager("mycontext"); + for (int i = 0; i < 2; i++) { + em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1))); + em.submit(MutableMap.of("tag", "B"), new BasicTask<Integer>(newIncrementCallable((1)))); + } + int total = 0; + for (Object tag : em.getTaskTags()) { + log.debug("tag {}", tag); + for (Task<?> task : em.getTasksWithTag(tag)) { + log.debug("BasicTask {}, has {}", task, task.get()); + total += (Integer)task.get(); + } + } + assertEquals(10, total); + //now that all have completed: + assertEquals(5, data.get(1)); + } + + @Test + public void runMultipleBasicTasksMultipleTags() throws Exception { + data.put(1, 1); + Collection<Task<Integer>> tasks = Lists.newArrayList(); + tasks.add(em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1)))); + tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("A","B")), new BasicTask<Integer>(newIncrementCallable(1)))); + tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("B","C")), new BasicTask<Integer>(newIncrementCallable(1)))); + tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("D")), new BasicTask<Integer>(newIncrementCallable(1)))); + int total = 0; + + for (Task<Integer> t : tasks) { + log.debug("BasicTask {}, has {}", t, t.get()); + total += t.get(); + } + assertEquals(10, total); + + //now that all have completed: + assertEquals(data.get(1), 5); + assertEquals(em.getTasksWithTag("A").size(), 2); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")).size(), 2); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")).size(), 2); + + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")).size(), 3); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")).size(), 1); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("B", "C")).size(), 1); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "D")).size(), 3); + } + + @Test + public void testGetTaskById() throws Exception { + Task<?> t = new BasicTask<Void>(newNoop()); + em.submit(MutableMap.of("tag", "A"), t); + assertEquals(em.getTask(t.getId()), t); + } + + @Test + public void testRetrievingTasksWithTagsReturnsExpectedTask() throws Exception { + Task<?> t = new BasicTask<Void>(newNoop()); + em.submit(MutableMap.of("tag", "A"), t); + t.get(); + + assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t)); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t)); + } + + @Test + public void testRetrievingTasksWithTagsExcludesNonMatchingTasks() throws Exception { + Task<?> t = new BasicTask<Void>(newNoop()); + em.submit(MutableMap.of("tag", "A"), t); + t.get(); + + assertEquals(em.getTasksWithTag("B"), ImmutableSet.of()); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableSet.of()); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableSet.of()); + } + + @Test + public void testRetrievingTasksWithMultipleTags() throws Exception { + Task<?> t = new BasicTask<Void>(newNoop()); + em.submit(MutableMap.of("tags", ImmutableList.of("A", "B")), t); + t.get(); + + assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t)); + assertEquals(em.getTasksWithTag("B"), ImmutableList.of(t)); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t)); + assertEquals(em.getTasksWithAllTags(ImmutableList.of("B")), ImmutableList.of(t)); + } + + // ENGR-1796: if nothing matched first tag, then returned whatever matched second tag! + @Test + public void testRetrievingTasksWithAllTagsWhenFirstNotMatched() throws Exception { + Task<?> t = new BasicTask<Void>(newNoop()); + em.submit(MutableMap.of("tags", ImmutableList.of("A")), t); + t.get(); + + assertEquals(em.getTasksWithAllTags(ImmutableList.of("not_there","A")), ImmutableSet.of()); + } + + @Test + public void testRetrievedTasksIncludesTasksInProgress() throws Exception { + final CountDownLatch runningLatch = new CountDownLatch(1); + final CountDownLatch finishLatch = new CountDownLatch(1); + Task<Void> t = new BasicTask<Void>(new Callable<Void>() { + public Void call() throws Exception { + runningLatch.countDown(); + finishLatch.await(); + return null; + }}); + em.submit(MutableMap.of("tags", ImmutableList.of("A")), t); + + try { + runningLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t)); + } finally { + finishLatch.countDown(); + } + } + + @Test + public void cancelBeforeRun() throws Exception { + final CountDownLatch blockForever = new CountDownLatch(1); + + BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() throws Exception { + blockForever.await(); return 42; + }}); + t.cancel(true); + assertTrue(t.isCancelled()); + assertTrue(t.isDone()); + assertTrue(t.isError()); + em.submit(MutableMap.of("tag", "A"), t); + try { + t.get(); + fail("get should have failed due to cancel"); + } catch (CancellationException e) { + // expected + } + assertTrue(t.isCancelled()); + assertTrue(t.isDone()); + assertTrue(t.isError()); + + log.debug("cancelBeforeRun status: {}", t.getStatusDetail(false)); + assertTrue(t.getStatusDetail(false).toLowerCase().contains("cancel")); + } + + @Test + public void cancelDuringRun() throws Exception { + final CountDownLatch signalStarted = new CountDownLatch(1); + final CountDownLatch blockForever = new CountDownLatch(1); + + BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() throws Exception { + synchronized (data) { + signalStarted.countDown(); + blockForever.await(); + } + return 42; + }}); + em.submit(MutableMap.of("tag", "A"), t); + assertFalse(t.isCancelled()); + assertFalse(t.isDone()); + assertFalse(t.isError()); + + assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + t.cancel(true); + + assertTrue(t.isCancelled()); + assertTrue(t.isError()); + try { + t.get(); + fail("get should have failed due to cancel"); + } catch (CancellationException e) { + // expected + } + assertTrue(t.isCancelled()); + assertTrue(t.isDone()); + assertTrue(t.isError()); + } + + @Test + public void cancelAfterRun() throws Exception { + BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(42)); + em.submit(MutableMap.of("tag", "A"), t); + + assertEquals(t.get(), (Integer)42); + t.cancel(true); + assertFalse(t.isCancelled()); + assertFalse(t.isError()); + assertTrue(t.isDone()); + } + + @Test + public void errorDuringRun() throws Exception { + BasicTask<Void> t = new BasicTask<Void>(new Callable<Void>() { + public Void call() throws Exception { + throw new IllegalStateException("Simulating failure in errorDuringRun"); + }}); + + em.submit(MutableMap.of("tag", "A"), t); + + try { + t.get(); + fail("get should have failed due to error"); + } catch (Exception eo) { + Throwable e = Throwables.getRootCause(eo); + assertEquals("Simulating failure in errorDuringRun", e.getMessage()); + } + + assertFalse(t.isCancelled()); + assertTrue(t.isError()); + assertTrue(t.isDone()); + + log.debug("errorDuringRun status: {}", t.getStatusDetail(false)); + assertTrue(t.getStatusDetail(false).contains("Simulating failure in errorDuringRun"), "details="+t.getStatusDetail(false)); + } + + @Test + public void fieldsSetForSimpleBasicTask() throws Exception { + final CountDownLatch signalStarted = new CountDownLatch(1); + final CountDownLatch allowCompletion = new CountDownLatch(1); + + BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() throws Exception { + signalStarted.countDown(); + allowCompletion.await(); + return 42; + }}); + assertEquals(null, t.getSubmittedByTask()); + assertEquals(-1, t.submitTimeUtc); + assertNull(t.getInternalFuture()); + + em.submit(MutableMap.of("tag", "A"), t); + assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + assertTrue(t.submitTimeUtc > 0); + assertTrue(t.startTimeUtc >= t.submitTimeUtc); + assertNotNull(t.getInternalFuture()); + assertEquals(-1, t.endTimeUtc); + assertEquals(false, t.isCancelled()); + + allowCompletion.countDown(); + assertEquals(t.get(), (Integer)42); + assertTrue(t.endTimeUtc >= t.startTimeUtc); + + log.debug("BasicTask duration (millis): {}", (t.endTimeUtc - t.submitTimeUtc)); + } + + @Test + public void fieldsSetForBasicTaskSubmittedBasicTask() throws Exception { + //submitted BasicTask B is started by A, and waits for A to complete + BasicTask<Integer> t = new BasicTask<Integer>(MutableMap.of("displayName", "sample", "description", "some descr"), new Callable<Integer>() { + public Integer call() throws Exception { + em.submit(MutableMap.of("tag", "B"), new Callable<Integer>() { + public Integer call() throws Exception { + assertEquals(45, em.getTasksWithTag("A").iterator().next().get()); + return 46; + }}); + return 45; + }}); + em.submit(MutableMap.of("tag", "A"), t); + + t.blockUntilEnded(); + +// assertEquals(em.getAllTasks().size(), 2 + + BasicTask<?> tb = (BasicTask<?>) em.getTasksWithTag("B").iterator().next(); + assertEquals( 46, tb.get() ); + assertEquals( t, em.getTasksWithTag("A").iterator().next() ); + assertNull( t.getSubmittedByTask() ); + + BasicTask<?> submitter = (BasicTask<?>) tb.getSubmittedByTask(); + assertNotNull(submitter); + assertEquals("sample", submitter.displayName); + assertEquals("some descr", submitter.description); + assertEquals(t, submitter); + + assertTrue(submitter.submitTimeUtc <= tb.submitTimeUtc); + assertTrue(submitter.endTimeUtc <= tb.endTimeUtc); + + log.debug("BasicTask {} was submitted by {}", tb, submitter); + } + + private Callable<Object> newPutCallable(final Object key, final Object val) { + return new Callable<Object>() { + public Object call() { + return data.put(key, val); + } + }; + } + + private Callable<Integer> newIncrementCallable(final Object key) { + return new Callable<Integer>() { + public Integer call() { + synchronized (data) { + return (Integer) data.put(key, (Integer)data.get(key) + 1); + } + } + }; + } + + private Runnable newPutRunnable(final Object key, final Object val) { + return new Runnable() { + public void run() { + data.put(key, val); + } + }; + } + + private Runnable newNoop() { + return new Runnable() { + public void run() { + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java new file mode 100644 index 0000000..020a98c --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionContext; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; + +public class BasicTasksFutureTest { + + private static final Logger log = LoggerFactory.getLogger(BasicTasksFutureTest.class); + + private BasicExecutionManager em; + private BasicExecutionContext ec; + private Map<Object,Object> data; + private ExecutorService ex; + private Semaphore started; + private Semaphore waitInTask; + private Semaphore cancelledWhileSleeping; + + @BeforeMethod(alwaysRun=true) + public void setUp() { + em = new BasicExecutionManager("mycontext"); + ec = new BasicExecutionContext(em); + ex = Executors.newCachedThreadPool(); + data = Collections.synchronizedMap(new LinkedHashMap<Object,Object>()); + started = new Semaphore(0); + waitInTask = new Semaphore(0); + cancelledWhileSleeping = new Semaphore(0); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (em != null) em.shutdownNow(); + if (ex != null) ex.shutdownNow(); + } + + @Test + public void testBlockAndGetWithTimeoutsAndListenableFuture() throws InterruptedException { + Task<String> t = waitForSemaphore(Duration.FIVE_SECONDS, true, "x"); + + Assert.assertFalse(t.blockUntilEnded(Duration.millis(1))); + Assert.assertFalse(t.blockUntilEnded(Duration.ZERO)); + boolean didNotThrow = false; + + try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; } + catch (Exception e) { /* expected */ } + Assert.assertFalse(didNotThrow); + + try { t.getUnchecked(Duration.ZERO); didNotThrow = true; } + catch (Exception e) { /* expected */ } + Assert.assertFalse(didNotThrow); + + addFutureListener(t, "before"); + ec.submit(t); + + Assert.assertFalse(t.blockUntilEnded(Duration.millis(1))); + Assert.assertFalse(t.blockUntilEnded(Duration.ZERO)); + + try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; } + catch (Exception e) { /* expected */ } + Assert.assertFalse(didNotThrow); + + try { t.getUnchecked(Duration.ZERO); didNotThrow = true; } + catch (Exception e) { /* expected */ } + Assert.assertFalse(didNotThrow); + + addFutureListener(t, "during"); + + synchronized (data) { + // now let it finish + waitInTask.release(); + Assert.assertTrue(t.blockUntilEnded(Duration.TEN_SECONDS)); + + Assert.assertEquals(t.getUnchecked(Duration.millis(1)), "x"); + Assert.assertEquals(t.getUnchecked(Duration.ZERO), "x"); + + Assert.assertNull(data.get("before")); + Assert.assertNull(data.get("during")); + // can't set the data(above) until we release the lock (in assert call below) + assertSoonGetsData("before"); + assertSoonGetsData("during"); + } + + // and see that a listener added late also runs + synchronized (data) { + addFutureListener(t, "after"); + Assert.assertNull(data.get("after")); + assertSoonGetsData("after"); + } + } + + private void addFutureListener(Task<String> t, final String key) { + t.addListener(new Runnable() { public void run() { + synchronized (data) { + log.info("notifying for "+key); + data.notifyAll(); + data.put(key, true); + } + }}, ex); + } + + private void assertSoonGetsData(String key) throws InterruptedException { + for (int i=0; i<10; i++) { + if (Boolean.TRUE.equals(data.get(key))) { + log.info("got data for "+key); + return; + } + data.wait(Duration.ONE_SECOND.toMilliseconds()); + } + Assert.fail("did not get data for '"+key+"' in time"); + } + + private <T> Task<T> waitForSemaphore(final Duration time, final boolean requireSemaphore, final T result) { + return Tasks.<T>builder().body(new Callable<T>() { + public T call() { + try { + started.release(); + log.info("waiting up to "+time+" to acquire before returning "+result); + if (!waitInTask.tryAcquire(time.toMilliseconds(), TimeUnit.MILLISECONDS)) { + log.info("did not get semaphore"); + if (requireSemaphore) Assert.fail("task did not get semaphore"); + } else { + log.info("got semaphore"); + } + } catch (Exception e) { + log.info("cancelled before returning "+result); + cancelledWhileSleeping.release(); + throw Exceptions.propagate(e); + } + log.info("task returning "+result); + return result; + } + }).build(); + } + + @Test + public void testCancelAfterStartTriggersListenableFuture() throws Exception { + doTestCancelTriggersListenableFuture(Duration.millis(50)); + } + @Test + public void testCancelImmediateTriggersListenableFuture() throws Exception { + // if cancel fires after submit but before it passes to the executor, + // that needs handling separately; this doesn't guarantee this code path, + // but it happens sometimes (and it should be handled) + doTestCancelTriggersListenableFuture(Duration.ZERO); + } + public void doTestCancelTriggersListenableFuture(Duration delay) throws Exception { + Task<String> t = waitForSemaphore(Duration.TEN_SECONDS, true, "x"); + addFutureListener(t, "before"); + + Stopwatch watch = Stopwatch.createStarted(); + ec.submit(t); + + addFutureListener(t, "during"); + + log.info("test cancelling "+t+" ("+t.getClass()+") after "+delay); + // NB: two different code paths (callers to this method) for notifying futures + // depending whether task is started + Time.sleep(delay); + + synchronized (data) { + t.cancel(true); + + assertSoonGetsData("before"); + assertSoonGetsData("during"); + + addFutureListener(t, "after"); + Assert.assertNull(data.get("after")); + assertSoonGetsData("after"); + } + + Assert.assertTrue(t.isDone()); + Assert.assertTrue(t.isCancelled()); + try { + t.get(); + Assert.fail("should have thrown CancellationException"); + } catch (CancellationException e) { /* expected */ } + + Assert.assertTrue(watch.elapsed(TimeUnit.MILLISECONDS) < Duration.FIVE_SECONDS.toMilliseconds(), + Time.makeTimeStringRounded(watch.elapsed(TimeUnit.MILLISECONDS))+" is too long; should have cancelled very quickly"); + + if (started.tryAcquire()) + // if the task is begun, this should get released + Assert.assertTrue(cancelledWhileSleeping.tryAcquire(5, TimeUnit.SECONDS)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java new file mode 100644 index 0000000..89bde95 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionContext; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.CompoundTask; +import org.apache.brooklyn.core.util.task.ParallelTask; +import org.apache.brooklyn.core.util.task.SequentialTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +/** + * Test the operation of the {@link CompoundTask} class. + */ +public class CompoundTaskExecutionTest { + + private static final Logger LOG = LoggerFactory.getLogger(CompoundTaskExecutionTest.class); + + BasicExecutionManager em; + BasicExecutionContext ec; + + @BeforeClass + public void setup() { + em = new BasicExecutionManager("mycontext"); + ec = new BasicExecutionContext(em); + } + + @AfterClass + public void teardown() { + if (em != null) em.shutdownNow(); + em = null; + } + + private BasicTask<String> taskReturning(final String val) { + return new BasicTask<String>(new Callable<String>() { + @Override public String call() { + return val; + } + }); + } + + private BasicTask<String> slowTaskReturning(final String val, final Duration pauseTime) { + return new BasicTask<String>(new Callable<String>() { + @Override public String call() { + Time.sleep(pauseTime); + return val; + } + }); + } + + + @Test + public void runSequenceTask() throws Exception { + BasicTask<String> t1 = taskReturning("a"); + BasicTask<String> t2 = taskReturning("b"); + BasicTask<String> t3 = taskReturning("c"); + BasicTask<String> t4 = taskReturning("d"); + Task<List<String>> tSequence = ec.submit(new SequentialTask<String>(t1, t2, t3, t4)); + assertEquals(tSequence.get(), ImmutableList.of("a", "b", "c", "d")); + } + + @Test + public void testSequentialTaskFailsWhenIntermediateTaskThrowsException() throws Exception { + BasicTask<String> t1 = taskReturning("a"); + BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() { + @Override public String call() throws Exception { + throw new IllegalArgumentException("forced exception"); + } + }); + BasicTask<String> t3 = taskReturning("c"); + SequentialTask<String> task = new SequentialTask<String>(t1, t2, t3); + Task<List<String>> tSequence = ec.submit(task); + + try { + tSequence.get(); + fail("t2 should have thrown an exception"); + } catch (Exception e) {} + + assertTrue(task.isDone()); + assertTrue(task.isError()); + assertTrue(t1.isDone()); + assertFalse(t1.isError()); + assertTrue(t2.isDone()); + assertTrue(t2.isError()); + // t3 not run because of t2 exception + assertFalse(t3.isDone()); + assertFalse(t3.isBegun()); + } + + @Test + public void testParallelTaskFailsWhenIntermediateTaskThrowsException() throws Exception { + // differs from test above of SequentialTask in that expect t3 to be executed, + // despite t2 failing. + // TODO Do we expect tSequence.get() to block for everything to either fail or complete, + // and then to throw exception? Currently it does *not* do that so test was previously failing. + + BasicTask<String> t1 = taskReturning("a"); + BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() { + @Override public String call() throws Exception { + throw new IllegalArgumentException("forced exception"); + } + }); + BasicTask<String> t3 = slowTaskReturning("c", Duration.millis(100)); + ParallelTask<String> task = new ParallelTask<String>(t1, t2, t3); + Task<List<String>> tSequence = ec.submit(task); + + try { + tSequence.get(); + fail("t2 should have thrown an exception"); + } catch (Exception e) {} + + assertTrue(task.isDone()); + assertTrue(task.isError()); + assertTrue(t1.isDone()); + assertFalse(t1.isError()); + assertTrue(t2.isDone()); + assertTrue(t2.isError()); + assertTrue(t3.isBegun()); + assertTrue(t3.isDone()); + assertFalse(t3.isError()); + } + + @Test + public void runParallelTask() throws Exception { + BasicTask<String> t1 = taskReturning("a"); + BasicTask<String> t2 = taskReturning("b"); + BasicTask<String> t3 = taskReturning("c"); + BasicTask<String> t4 = taskReturning("d"); + Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3)); + assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d")); + } + + @Test + public void runParallelTaskWithDelay() throws Exception { + final Semaphore locker = new Semaphore(0); + BasicTask<String> t1 = new BasicTask<String>(new Callable<String>() { + @Override public String call() { + try { + locker.acquire(); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + return "a"; + } + }); + BasicTask<String> t2 = taskReturning("b"); + BasicTask<String> t3 = taskReturning("c"); + BasicTask<String> t4 = taskReturning("d"); + final Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3)); + + assertEquals(ImmutableSet.of(t2.get(), t3.get(), t4.get()), ImmutableSet.of("b", "c", "d")); + assertFalse(t1.isDone()); + assertFalse(tSequence.isDone()); + + // get blocks until tasks have completed + Thread t = new Thread() { + @Override public void run() { + try { + tSequence.get(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + locker.release(); + } + }; + t.start(); + Thread.sleep(30); + assertTrue(t.isAlive()); + + locker.release(); + + assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d")); + assertTrue(t1.isDone()); + assertTrue(tSequence.isDone()); + + locker.acquire(); + } + + @Test + public void testComplexOrdering() throws Exception { + List<String> data = new CopyOnWriteArrayList<String>(); + SequentialTask<String> taskA = new SequentialTask<String>( + appendAfterDelay(data, "a1"), appendAfterDelay(data, "a2"), appendAfterDelay(data, "a3"), appendAfterDelay(data, "a4")); + SequentialTask<String> taskB = new SequentialTask<String>( + appendAfterDelay(data, "b1"), appendAfterDelay(data, "b2"), appendAfterDelay(data, "b3"), appendAfterDelay(data, "b4")); + Task<List<String>> t = ec.submit(new ParallelTask<String>(taskA, taskB)); + t.get(); + + LOG.debug("Tasks happened in order: {}", data); + assertEquals(data.size(), 8); + assertEquals(new HashSet<String>(data), ImmutableSet.of("a1", "a2", "a3", "a4", "b1", "b2", "b3", "b4")); + + // a1, ..., a4 should be in order + List<String> as = Lists.newArrayList(), bs = Lists.newArrayList(); + for (String value : data) { + ((value.charAt(0) == 'a') ? as : bs).add(value); + } + assertEquals(as, ImmutableList.of("a1", "a2", "a3", "a4")); + assertEquals(bs, ImmutableList.of("b1", "b2", "b3", "b4")); + } + + private BasicTask<String> appendAfterDelay(final List<String> list, final String value) { + return new BasicTask<String>(new Callable<String>() { + @Override public String call() { + try { + Thread.sleep((int) (100 * Math.random())); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + LOG.debug("running {}", value); + list.add(value); + return value; + } + }); + } + +}
