Rename eventReceiverInitiated to eventReceiverInitialized
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1ed041dd Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1ed041dd Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1ed041dd Branch: refs/heads/stratos-4.1.x Commit: 1ed041dd102fdadb3839b7c01b5d554a69813b31 Parents: 49488b6 Author: Akila Perera <[email protected]> Authored: Sat Nov 21 12:05:37 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Sat Nov 21 12:35:40 2015 +0530 ---------------------------------------------------------------------- .../tests/ADCMTAppTenantUserTestCase.java | 4 +- .../integration/tests/ADCMTAppTestCase.java | 5 +- .../agent/integration/tests/ADCTestCase.java | 2 +- .../integration/tests/AgentStartupTestCase.java | 2 +- .../tests/MessageBrokerHATestCase.java | 210 +++++++++---------- .../tests/PythonAgentIntegrationTest.java | 89 ++++---- 6 files changed, 148 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java index 26c52d1..6e40dd6 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java @@ -120,7 +120,7 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest { Thread startupTestThread = new Thread(new Runnable() { @Override public void run() { - while (!eventReceiverInitiated) { + while (!eventReceiverInitialized) { sleep(1000); } List<String> outputLines = new ArrayList<String>(); @@ -214,4 +214,4 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest { return topology; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java index f677629..6f0b070 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java @@ -122,7 +122,7 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest { Thread startupTestThread = new Thread(new Runnable() { @Override public void run() { - while (!eventReceiverInitiated) { + while (!eventReceiverInitialized) { sleep(1000); } List<String> outputLines = new ArrayList<String>(); @@ -165,6 +165,7 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest { while (!instanceStarted || !instanceActivated) { // wait until the instance activated event is received. // this will assert whether instance got activated within timeout period; no need for explicit assertions + log.info("Waiting for agent activation..."); sleep(2000); } } @@ -215,4 +216,4 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest { cluster.addMember(member); return topology; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java index 14797e4..0dc92be 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java @@ -179,7 +179,7 @@ public class ADCTestCase extends PythonAgentIntegrationTest { Thread startupTestThread = new Thread(new Runnable() { @Override public void run() { - while (!eventReceiverInitiated) { + while (!eventReceiverInitialized) { sleep(1000); } List<String> outputLines = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java index f0a70d4..ea156b6 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java @@ -96,7 +96,7 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest { Thread startupTestThread = new Thread(new Runnable() { @Override public void run() { - while (!eventReceiverInitiated) { + while (!eventReceiverInitialized) { sleep(2000); } List<String> outputLines = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java index e6203a7..b1f4d8b 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java @@ -23,61 +23,60 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.domain.LoadBalancingIPType; import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; -import org.testng.annotations.Test; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** - * Test case to test the messsage broker connection resilience in the Python Cartridge Agent + * Test case to test the message broker connection resilience in the Python Cartridge Agent */ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { public MessageBrokerHATestCase() throws IOException { } private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class); - private static final int ADC_TIMEOUT = 300000; - private static final String CLUSTER_ID = "tomcat.domain"; - private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6"; - private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-6"; - private static final String APP_ID = "application-6"; - private static final String MEMBER_ID = "tomcat.member-1"; + private static final int HA_TEST_TIMEOUT = 300000; + private static final String CLUSTER_ID = "php.php.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1"; + private static final String APP_ID = "application-1"; + private static final String MEMBER_ID = "php.member-1"; private static final String INSTANCE_ID = "instance-1"; private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; private static final String NETWORK_PARTITION_ID = "network-partition-1"; private static final String PARTITION_ID = "partition-1"; - private static final String TENANT_ID = "6"; - private static final String SERVICE_NAME = "tomcat"; - + private static final String TENANT_ID = "-1234"; + private static final String SERVICE_NAME = "php"; + boolean pcaActivated = false; @BeforeMethod(alwaysRun = true) public void setup() throws Exception { System.setProperty("jndi.properties.dir", getTestCaseResourcesPath()); -// integrationTestPropertiesPath = new FileInputStream(new File(getTestCaseResourcesPath() + PATH_SEP + "integration-test.properties")); - - super.setup(ADC_TIMEOUT); + super.setup(HA_TEST_TIMEOUT); startServerSocket(8080); } - + @AfterMethod(alwaysRun = true) - public void tearDownBrokerHATest(){ + public void tearDownBrokerHATest() { tearDown(); } - @Test(groups = {"test"}) - public void testBrokerFailoverHeartbeat(){ + @Test(timeOut = HA_TEST_TIMEOUT, + groups = { "ha" }, + priority = 1) + public void testBrokerFailoverHeartbeat() { + log.info("Running MessageBrokerHATestCase subscriber failover test..."); startCommunicatorThread(); + assertAgentActivation(); sleep(10000); -// assertAgentActivation(); // take down the default broker log.info("Stopping subscribed message broker: DEFAULT"); @@ -89,12 +88,15 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { List<String> newLines = getNewLines(outputLines, outputStream.toString()); if (newLines.size() > 0) { for (String line : newLines) { - if (line.contains("Message broker localhost:" + mqttBindPorts[0] + " cannot be reached. Disconnecting client...")) { - log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + if (line.contains("Message broker localhost:" + mqttBindPorts[0] + + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is" + + " trying the next option."); exit = true; } } } + log.info("Waiting for message broker subscriber failover detection for the 1st time."); sleep(1000); } @@ -107,16 +109,18 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { List<String> newLines = getNewLines(outputLines, outputStream.toString()); if (newLines.size() > 0) { for (String line : newLines) { - if (line.contains("Message broker localhost:" + mqttBindPorts[1] + " cannot be reached. Disconnecting client...")) { - log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + if (line.contains("Message broker localhost:" + mqttBindPorts[1] + + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is" + + " trying the next option."); exit = true; } } } + log.info("Waiting for message broker subscriber failover detection for the 2nd time."); sleep(1000); } - sleep(20000); log.info("Stopping subscribed message broker"); stopActiveMQInstance("testBroker-" + amqpBindPorts[2] + "-" + mqttBindPorts[2]); @@ -125,99 +129,100 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { List<String> newLines = getNewLines(outputLines, outputStream.toString()); if (newLines.size() > 0) { for (String line : newLines) { - if (line.contains("Message broker localhost:" + mqttBindPorts[2] + " cannot be reached. Disconnecting client...")) { - log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + if (line.contains("Message broker localhost:" + mqttBindPorts[2] + + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is" + + " trying the next option."); } - if (line.contains("Could not connect to any of the message brokers provided. Retrying in 2 seconds")) { + if (line.contains( + "Could not connect to any of the message brokers provided. Retrying in 2 seconds")) { log.info("Failover went through all the options and will be retrying."); exit = true; } } } + log.info("Waiting for message broker subscriber failover detection for the 3rd time."); sleep(1000); } + log.info("MessageBrokerHATestCase subscriber test completed successfully."); } - - @Test(groups = {"smoke"}) - public void testBrokerFailoverForPublisher(){ - startCommunicatorThread(); - + @Test(timeOut = HA_TEST_TIMEOUT, + groups = { "ha" }, + priority = 2) + public void testBrokerFailoverForPublisher() { + log.info("Running MessageBrokerHATestCase publisher failover test..."); + startCommunicatorThread(); + assertAgentActivation(); List<String> outputLines = new ArrayList<>(); boolean exit = false; + boolean publishCleanupEvent = false; while (!exit) { List<String> newLines = getNewLines(outputLines, outputStream.toString()); if (newLines.size() > 0) { for (String line : newLines) { - if (line.contains("Subscribed to 'topology/#'")) { - // take down the default broker - stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); - } - - if (line.contains("Waiting for complete topology event")) { - - sleep(4000); - -// stopActiveMQInstance("testBroker2"); -// stopActiveMQInstance("testBroker3"); - // Send complete topology event - log.info("Publishing complete topology event..."); - Topology topology = createTestTopology(); - CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); - publishEvent(completeTopologyEvent); - log.info("Complete topology event published"); - } - - if (line.contains("Waiting for cartridge agent to be initialized")) { - // Publish member initialized event - log.info("Publishing member initialized event..."); - MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( - SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, - PARTITION_ID, INSTANCE_ID - ); - publishEvent(memberInitializedEvent); - log.info("Member initialized event published"); - } - - - // Send artifact updated event to activate the instance first - if (line.contains("Artifact repository found")) { - publishEvent(getArtifactUpdatedEventForPublicRepo()); - log.info("Artifact updated event published"); + if (!publishCleanupEvent) { + log.info("Publishing instance cleanup member event and shutting down first MB instance..."); + + // publish instance cleanup event to trigger an ready to shutdown event being published from PCA + InstanceCleanupMemberEvent instanceCleanupMemberEvent = new InstanceCleanupMemberEvent( + MEMBER_ID); + publishEvent(instanceCleanupMemberEvent); + publishCleanupEvent = true; + waitUntilCleanupEventIsReceivedAndStopDefaultMB(); } if (line.contains("Could not publish event to message broker localhost:1885.")) { log.info("Event publishing to default message broker failed and the next option is tried."); exit = true; } - -// if (line.contains("The event will be dropped.")) { -// log.info("Event publishing failed after timeout exceeded and the event was dropped."); -// exit = true; -// } } } + log.info("Waiting for message broker publisher failover detection."); sleep(1000); } -// assertAgentActivation(); + // assertAgentActivation(); + log.info("MessageBrokerHATestCase publisher test completed successfully."); + } + + private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() { + boolean eventReceived = false; + List<String> outputLines = new ArrayList<>(); + + while (!eventReceived) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Message received: instance/notifier/InstanceCleanupMemberEvent")) { + // take down the default broker + stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); + eventReceived = true; + } + } + } + log.info("Waiting until cleanup event is received by PCA..."); + } + log.info("Cleanup event is received by PCA."); } private void assertAgentActivation() { + pcaActivated = false; + instanceActivated = false; + instanceStarted = false; Thread startupTestThread = new Thread(new Runnable() { @Override public void run() { - while (!eventReceiverInitiated) { + while (!eventReceiverInitialized) { + log.info("Waiting until event receiver is initialized..."); sleep(1000); } - List<String> outputLines = new ArrayList<>(); - boolean completeTopologyPublished = false; - boolean memberInitPublished = false; - while (!outputStream.isClosed()) { + List<String> outputLines = new ArrayList<String>(); + while (!outputStream.isClosed() && !pcaActivated) { List<String> newLines = getNewLines(outputLines, outputStream.toString()); if (newLines.size() > 0) { for (String line : newLines) { - if (line.contains("Waiting for complete topology event") && !completeTopologyPublished) { + if (line.contains("Subscribed to 'topology/#'")) { sleep(2000); // Send complete topology event log.info("Publishing complete topology event..."); @@ -225,30 +230,21 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); publishEvent(completeTopologyEvent); log.info("Complete topology event published"); - completeTopologyPublished = true; - } + sleep(2000); - if (line.contains("Waiting for cartridge agent to be initialized") && !memberInitPublished) { // Publish member initialized event log.info("Publishing member initialized event..."); - MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( - SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, - PARTITION_ID, INSTANCE_ID - ); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(SERVICE_NAME, + CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, PARTITION_ID, + INSTANCE_ID); publishEvent(memberInitializedEvent); log.info("Member initialized event published"); - memberInitPublished = true; - } - - // Send artifact updated event to activate the instance first - if (line.contains("Artifact repository found")) { - publishEvent(getArtifactUpdatedEventForPublicRepo()); - log.info("Artifact updated event published"); } } } sleep(1000); } + log.info("Startup test thread finished."); } }); startupTestThread.start(); @@ -256,21 +252,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { while (!instanceStarted || !instanceActivated) { // wait until the instance activated event is received. // this will assert whether instance got activated within timeout period; no need for explicit assertions + log.info("Waiting for agent activation..."); sleep(2000); } - } - - private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() { - ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); - publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); - return publicRepoEvent; - } - - private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { - ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent(); - artifactUpdatedEvent.setClusterId(CLUSTER_ID); - artifactUpdatedEvent.setTenantId(TENANT_ID); - return artifactUpdatedEvent; + pcaActivated = true; + log.info("PCA activation assertion passed."); } /** @@ -287,9 +273,8 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { AUTOSCALING_POLICY_NAME, APP_ID); service.addCluster(cluster); - Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, - CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, - System.currentTimeMillis()); + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID, + NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis()); member.setDefaultPrivateIP("10.0.0.1"); member.setDefaultPublicIP("20.0.0.1"); @@ -298,7 +283,6 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { member.setProperties(properties); member.setStatus(MemberStatus.Created); cluster.addMember(member); - return topology; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index fb12db6..6e25b6b 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.security.AuthenticationUser; import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.commons.exec.*; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +39,6 @@ import org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; import org.apache.stratos.python.cartridge.agent.integration.common.ThriftTestServer; -import org.apache.commons.io.IOUtils; import java.io.*; import java.net.ServerSocket; @@ -63,7 +63,7 @@ public class PythonAgentIntegrationTest { public static final String TEST_THREAD_POOL_SIZE = "test.thread.pool.size"; protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); -// protected final String defaultBrokerName = "testBrokerDefault"; + // protected final String defaultBrokerName = "testBrokerDefault"; protected final Properties integrationProperties = new Properties(); protected Map<Integer, ServerSocket> serverSocketMap = new HashMap<>(); @@ -76,7 +76,7 @@ public class PythonAgentIntegrationTest { protected String distributionName; protected int testThreadPoolSize; - protected boolean eventReceiverInitiated = false; + protected boolean eventReceiverInitialized = false; protected TopologyEventReceiver topologyEventReceiver; protected InstanceStatusEventReceiver instanceStatusEventReceiver; protected InitializerEventReceiver initializerEventReceiver; @@ -87,7 +87,6 @@ public class PythonAgentIntegrationTest { private Map<String, BrokerService> messageBrokers; - /** * Setup method for test method testPythonCartridgeAgent */ @@ -100,58 +99,58 @@ public class PythonAgentIntegrationTest { cepSSLPort = Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT)); Properties jndiProperties = new Properties(); - jndiProperties.load(new FileInputStream(new File(System.getProperty("jndi.properties.dir") + PATH_SEP + "jndi.properties"))); - if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || !jndiProperties.containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) { + jndiProperties.load(new FileInputStream( + new File(System.getProperty("jndi.properties.dir") + PATH_SEP + "jndi.properties"))); + if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || !jndiProperties + .containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) { amqpBindPorts = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(","); mqttBindPorts = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(","); - }else{ + } else { amqpBindPorts = jndiProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(","); mqttBindPorts = jndiProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(","); } if (amqpBindPorts.length != mqttBindPorts.length) { - throw new RuntimeException("The number of AMQP ports and MQTT ports should be equal in integration-test.properties."); + throw new RuntimeException( + "The number of AMQP ports and MQTT ports should be equal in integration-test.properties."); } // start ActiveMQ test server - for (int i = 0; i < amqpBindPorts.length; i++){ + for (int i = 0; i < amqpBindPorts.length; i++) { + log.info("Starting ActiveMQ instance with AMQP: " + amqpBindPorts[i] + ", MQTT: " + mqttBindPorts[i]); startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), Integer.parseInt(mqttBindPorts[i]), true); } - if (!this.eventReceiverInitiated) { - ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize); - topologyEventReceiver = new TopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); + ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize); + topologyEventReceiver = new TopologyEventReceiver(); + topologyEventReceiver.setExecutorService(executorService); + topologyEventReceiver.execute(); - instanceStatusEventReceiver = new InstanceStatusEventReceiver(); - instanceStatusEventReceiver.setExecutorService(executorService); - instanceStatusEventReceiver.execute(); + instanceStatusEventReceiver = new InstanceStatusEventReceiver(); + instanceStatusEventReceiver.setExecutorService(executorService); + instanceStatusEventReceiver.execute(); - this.instanceStarted = false; - instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("Instance started event received"); - instanceStarted = true; - } - }); + instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance started event received"); + instanceStarted = true; + } + }); - this.instanceActivated = false; - instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("Instance activated event received"); - instanceActivated = true; - } - }); + instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("Instance activated event received"); + instanceActivated = true; + } + }); - initializerEventReceiver = new InitializerEventReceiver(); - initializerEventReceiver.setExecutorService(executorService); - initializerEventReceiver.execute(); + initializerEventReceiver = new InitializerEventReceiver(); + initializerEventReceiver.setExecutorService(executorService); + initializerEventReceiver.execute(); - this.eventReceiverInitiated = true; - } + this.eventReceiverInitialized = true; // Start CEP Thrift test server thriftTestServer = new ThriftTestServer(); @@ -226,7 +225,7 @@ public class PythonAgentIntegrationTest { // stop the broker services for (Map.Entry<String, BrokerService> entry : this.messageBrokers.entrySet()) { try { - log.debug("Stopping broker service [" + entry.getKey() + "]"); + log.debug("Stopping broker service [" + entry.getKey() + "]"); entry.getValue().stop(); } catch (Exception ignore) { } @@ -240,8 +239,8 @@ public class PythonAgentIntegrationTest { } public PythonAgentIntegrationTest() throws IOException { - integrationProperties.load( - PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties")); + integrationProperties + .load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties")); distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME); cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT)); cepSSLPort = Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT)); @@ -279,7 +278,7 @@ public class PythonAgentIntegrationTest { AuthenticationUser authenticationUser = new AuthenticationUser("system", "manager", "users,admins"); List<AuthenticationUser> authUserList = new ArrayList<>(); authUserList.add(authenticationUser); - broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(authUserList)}); + broker.setPlugins(new BrokerPlugin[] { new SimpleAuthenticationPlugin(authUserList) }); } broker.setBrokerName(brokerName); @@ -293,8 +292,8 @@ public class PythonAgentIntegrationTest { return brokerName; } - protected void stopActiveMQInstance(String brokerName){ - if (this.messageBrokers.containsKey(brokerName)){ + protected void stopActiveMQInstance(String brokerName) { + if (this.messageBrokers.containsKey(brokerName)) { log.debug("Stopping broker service [" + brokerName + "]"); BrokerService broker = this.messageBrokers.get(brokerName); try { @@ -320,7 +319,7 @@ public class PythonAgentIntegrationTest { log.error("ERROR found in PCA log", e); } } - log.info("[PCA] " + line); + log.debug("[PCA] " + line); } } sleep(100);
