Rename eventReceiverInitiated to eventReceiverInitialized

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1ed041dd
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1ed041dd
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1ed041dd

Branch: refs/heads/stratos-4.1.x
Commit: 1ed041dd102fdadb3839b7c01b5d554a69813b31
Parents: 49488b6
Author: Akila Perera <[email protected]>
Authored: Sat Nov 21 12:05:37 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Sat Nov 21 12:35:40 2015 +0530

----------------------------------------------------------------------
 .../tests/ADCMTAppTenantUserTestCase.java       |   4 +-
 .../integration/tests/ADCMTAppTestCase.java     |   5 +-
 .../agent/integration/tests/ADCTestCase.java    |   2 +-
 .../integration/tests/AgentStartupTestCase.java |   2 +-
 .../tests/MessageBrokerHATestCase.java          | 210 +++++++++----------
 .../tests/PythonAgentIntegrationTest.java       |  89 ++++----
 6 files changed, 148 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
index 26c52d1..6e40dd6 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
@@ -120,7 +120,7 @@ public class ADCMTAppTenantUserTestCase extends 
PythonAgentIntegrationTest {
         Thread startupTestThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                while (!eventReceiverInitiated) {
+                while (!eventReceiverInitialized) {
                     sleep(1000);
                 }
                 List<String> outputLines = new ArrayList<String>();
@@ -214,4 +214,4 @@ public class ADCMTAppTenantUserTestCase extends 
PythonAgentIntegrationTest {
 
         return topology;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
index f677629..6f0b070 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
@@ -122,7 +122,7 @@ public class ADCMTAppTestCase extends 
PythonAgentIntegrationTest {
         Thread startupTestThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                while (!eventReceiverInitiated) {
+                while (!eventReceiverInitialized) {
                     sleep(1000);
                 }
                 List<String> outputLines = new ArrayList<String>();
@@ -165,6 +165,7 @@ public class ADCMTAppTestCase extends 
PythonAgentIntegrationTest {
         while (!instanceStarted || !instanceActivated) {
             // wait until the instance activated event is received.
             // this will assert whether instance got activated within timeout 
period; no need for explicit assertions
+            log.info("Waiting for agent activation...");
             sleep(2000);
         }
     }
@@ -215,4 +216,4 @@ public class ADCMTAppTestCase extends 
PythonAgentIntegrationTest {
         cluster.addMember(member);
         return topology;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
index 14797e4..0dc92be 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
@@ -179,7 +179,7 @@ public class ADCTestCase extends PythonAgentIntegrationTest 
{
         Thread startupTestThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                while (!eventReceiverInitiated) {
+                while (!eventReceiverInitialized) {
                     sleep(1000);
                 }
                 List<String> outputLines = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index f0a70d4..ea156b6 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -96,7 +96,7 @@ public class AgentStartupTestCase extends 
PythonAgentIntegrationTest {
         Thread startupTestThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                while (!eventReceiverInitiated) {
+                while (!eventReceiverInitialized) {
                     sleep(2000);
                 }
                 List<String> outputLines = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index e6203a7..b1f4d8b 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -23,61 +23,60 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.domain.LoadBalancingIPType;
 import org.apache.stratos.messaging.domain.topology.*;
-import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
 import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
-import org.testng.annotations.Test;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 /**
- * Test case to test the messsage broker connection resilience in the Python 
Cartridge Agent
+ * Test case to test the message broker connection resilience in the Python 
Cartridge Agent
  */
 public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
     public MessageBrokerHATestCase() throws IOException {
     }
 
     private static final Log log = 
LogFactory.getLog(MessageBrokerHATestCase.class);
-    private static final int ADC_TIMEOUT = 300000;
-    private static final String CLUSTER_ID = "tomcat.domain";
-    private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6";
-    private static final String AUTOSCALING_POLICY_NAME = 
"autoscaling-policy-6";
-    private static final String APP_ID = "application-6";
-    private static final String MEMBER_ID = "tomcat.member-1";
+    private static final int HA_TEST_TIMEOUT = 300000;
+    private static final String CLUSTER_ID = "php.php.domain";
+    private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1";
+    private static final String AUTOSCALING_POLICY_NAME = 
"autoscaling-policy-1";
+    private static final String APP_ID = "application-1";
+    private static final String MEMBER_ID = "php.member-1";
     private static final String INSTANCE_ID = "instance-1";
     private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1";
     private static final String NETWORK_PARTITION_ID = "network-partition-1";
     private static final String PARTITION_ID = "partition-1";
-    private static final String TENANT_ID = "6";
-    private static final String SERVICE_NAME = "tomcat";
-
+    private static final String TENANT_ID = "-1234";
+    private static final String SERVICE_NAME = "php";
+    boolean pcaActivated = false;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         System.setProperty("jndi.properties.dir", getTestCaseResourcesPath());
-//        integrationTestPropertiesPath = new FileInputStream(new 
File(getTestCaseResourcesPath() + PATH_SEP + "integration-test.properties"));
-
-        super.setup(ADC_TIMEOUT);
+        super.setup(HA_TEST_TIMEOUT);
         startServerSocket(8080);
     }
-    
+
     @AfterMethod(alwaysRun = true)
-    public void tearDownBrokerHATest(){
+    public void tearDownBrokerHATest() {
         tearDown();
     }
 
-    @Test(groups = {"test"})
-    public void testBrokerFailoverHeartbeat(){
+    @Test(timeOut = HA_TEST_TIMEOUT,
+          groups = { "ha" },
+          priority = 1)
+    public void testBrokerFailoverHeartbeat() {
+        log.info("Running MessageBrokerHATestCase subscriber failover 
test...");
         startCommunicatorThread();
+        assertAgentActivation();
         sleep(10000);
-//        assertAgentActivation();
 
         // take down the default broker
         log.info("Stopping subscribed message broker: DEFAULT");
@@ -89,12 +88,15 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
             List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
             if (newLines.size() > 0) {
                 for (String line : newLines) {
-                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[0] + " cannot be reached. Disconnecting client...")) {
-                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[0]
+                            + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is"
+                                + " trying the next option.");
                         exit = true;
                     }
                 }
             }
+            log.info("Waiting for message broker subscriber failover detection 
for the 1st time.");
             sleep(1000);
         }
 
@@ -107,16 +109,18 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
             List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
             if (newLines.size() > 0) {
                 for (String line : newLines) {
-                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[1] + " cannot be reached. Disconnecting client...")) {
-                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[1]
+                            + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is"
+                                + " trying the next option.");
                         exit = true;
                     }
                 }
             }
+            log.info("Waiting for message broker subscriber failover detection 
for the 2nd time.");
             sleep(1000);
         }
 
-        sleep(20000);
         log.info("Stopping subscribed message broker");
         stopActiveMQInstance("testBroker-" + amqpBindPorts[2] + "-" + 
mqttBindPorts[2]);
 
@@ -125,99 +129,100 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
             List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
             if (newLines.size() > 0) {
                 for (String line : newLines) {
-                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[2] + " cannot be reached. Disconnecting client...")) {
-                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[2]
+                            + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is"
+                                + " trying the next option.");
                     }
-                    if (line.contains("Could not connect to any of the message 
brokers provided. Retrying in 2 seconds")) {
+                    if (line.contains(
+                            "Could not connect to any of the message brokers 
provided. Retrying in 2 seconds")) {
                         log.info("Failover went through all the options and 
will be retrying.");
                         exit = true;
                     }
                 }
             }
+            log.info("Waiting for message broker subscriber failover detection 
for the 3rd time.");
             sleep(1000);
         }
+        log.info("MessageBrokerHATestCase subscriber test completed 
successfully.");
     }
-    
-    @Test(groups = {"smoke"})
-    public void testBrokerFailoverForPublisher(){
-        startCommunicatorThread();
-
 
+    @Test(timeOut = HA_TEST_TIMEOUT,
+          groups = { "ha" },
+          priority = 2)
+    public void testBrokerFailoverForPublisher() {
+        log.info("Running MessageBrokerHATestCase publisher failover test...");
+        startCommunicatorThread();
+        assertAgentActivation();
         List<String> outputLines = new ArrayList<>();
         boolean exit = false;
+        boolean publishCleanupEvent = false;
         while (!exit) {
             List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
             if (newLines.size() > 0) {
                 for (String line : newLines) {
-                    if (line.contains("Subscribed to 'topology/#'")) {
-                        // take down the default broker
-                        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] 
+ "-" + mqttBindPorts[0]);
-                    }
-
-                    if (line.contains("Waiting for complete topology event")) {
-
-                        sleep(4000);
-
-//                        stopActiveMQInstance("testBroker2");
-//                        stopActiveMQInstance("testBroker3");
-                        // Send complete topology event
-                        log.info("Publishing complete topology event...");
-                        Topology topology = createTestTopology();
-                        CompleteTopologyEvent completeTopologyEvent = new 
CompleteTopologyEvent(topology);
-                        publishEvent(completeTopologyEvent);
-                        log.info("Complete topology event published");
-                    }
-
-                    if (line.contains("Waiting for cartridge agent to be 
initialized")) {
-                        // Publish member initialized event
-                        log.info("Publishing member initialized event...");
-                        MemberInitializedEvent memberInitializedEvent = new 
MemberInitializedEvent(
-                                SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, 
MEMBER_ID, NETWORK_PARTITION_ID,
-                                PARTITION_ID, INSTANCE_ID
-                        );
-                        publishEvent(memberInitializedEvent);
-                        log.info("Member initialized event published");
-                    }
-
-
-                    // Send artifact updated event to activate the instance 
first
-                    if (line.contains("Artifact repository found")) {
-                        publishEvent(getArtifactUpdatedEventForPublicRepo());
-                        log.info("Artifact updated event published");
+                    if (!publishCleanupEvent) {
+                        log.info("Publishing instance cleanup member event and 
shutting down first MB instance...");
+
+                        // publish instance cleanup event to trigger an ready 
to shutdown event being published from PCA
+                        InstanceCleanupMemberEvent instanceCleanupMemberEvent 
= new InstanceCleanupMemberEvent(
+                                MEMBER_ID);
+                        publishEvent(instanceCleanupMemberEvent);
+                        publishCleanupEvent = true;
+                        waitUntilCleanupEventIsReceivedAndStopDefaultMB();
                     }
 
                     if (line.contains("Could not publish event to message 
broker localhost:1885.")) {
                         log.info("Event publishing to default message broker 
failed and the next option is tried.");
                         exit = true;
                     }
-
-//                    if (line.contains("The event will be dropped.")) {
-//                        log.info("Event publishing failed after timeout 
exceeded and the event was dropped.");
-//                        exit = true;
-//                    }
                 }
             }
+            log.info("Waiting for message broker publisher failover 
detection.");
             sleep(1000);
         }
 
-//        assertAgentActivation();
+        //        assertAgentActivation();
+        log.info("MessageBrokerHATestCase publisher test completed 
successfully.");
+    }
+
+    private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() {
+        boolean eventReceived = false;
+        List<String> outputLines = new ArrayList<>();
+
+        while (!eventReceived) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Message received: 
instance/notifier/InstanceCleanupMemberEvent")) {
+                        // take down the default broker
+                        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] 
+ "-" + mqttBindPorts[0]);
+                        eventReceived = true;
+                    }
+                }
+            }
+            log.info("Waiting until cleanup event is received by PCA...");
+        }
+        log.info("Cleanup event is received by PCA.");
     }
 
     private void assertAgentActivation() {
+        pcaActivated = false;
+        instanceActivated = false;
+        instanceStarted = false;
         Thread startupTestThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                while (!eventReceiverInitiated) {
+                while (!eventReceiverInitialized) {
+                    log.info("Waiting until event receiver is initialized...");
                     sleep(1000);
                 }
-                List<String> outputLines = new ArrayList<>();
-                boolean completeTopologyPublished = false;
-                boolean memberInitPublished = false;
-                while (!outputStream.isClosed()) {
+                List<String> outputLines = new ArrayList<String>();
+                while (!outputStream.isClosed() && !pcaActivated) {
                     List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
                     if (newLines.size() > 0) {
                         for (String line : newLines) {
-                            if (line.contains("Waiting for complete topology 
event") && !completeTopologyPublished) {
+                            if (line.contains("Subscribed to 'topology/#'")) {
                                 sleep(2000);
                                 // Send complete topology event
                                 log.info("Publishing complete topology 
event...");
@@ -225,30 +230,21 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
                                 CompleteTopologyEvent completeTopologyEvent = 
new CompleteTopologyEvent(topology);
                                 publishEvent(completeTopologyEvent);
                                 log.info("Complete topology event published");
-                                completeTopologyPublished = true;
-                            }
+                                sleep(2000);
 
-                            if (line.contains("Waiting for cartridge agent to 
be initialized") && !memberInitPublished) {
                                 // Publish member initialized event
                                 log.info("Publishing member initialized 
event...");
-                                MemberInitializedEvent memberInitializedEvent 
= new MemberInitializedEvent(
-                                        SERVICE_NAME, CLUSTER_ID, 
CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
-                                        PARTITION_ID, INSTANCE_ID
-                                );
+                                MemberInitializedEvent memberInitializedEvent 
= new MemberInitializedEvent(SERVICE_NAME,
+                                        CLUSTER_ID, CLUSTER_INSTANCE_ID, 
MEMBER_ID, NETWORK_PARTITION_ID, PARTITION_ID,
+                                        INSTANCE_ID);
                                 publishEvent(memberInitializedEvent);
                                 log.info("Member initialized event published");
-                                memberInitPublished = true;
-                            }
-
-                            // Send artifact updated event to activate the 
instance first
-                            if (line.contains("Artifact repository found")) {
-                                
publishEvent(getArtifactUpdatedEventForPublicRepo());
-                                log.info("Artifact updated event published");
                             }
                         }
                     }
                     sleep(1000);
                 }
+                log.info("Startup test thread finished.");
             }
         });
         startupTestThread.start();
@@ -256,21 +252,11 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
         while (!instanceStarted || !instanceActivated) {
             // wait until the instance activated event is received.
             // this will assert whether instance got activated within timeout 
period; no need for explicit assertions
+            log.info("Waiting for agent activation...");
             sleep(2000);
         }
-    }
-
-    private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() {
-        ArtifactUpdatedEvent publicRepoEvent = 
createTestArtifactUpdatedEvent();
-        
publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git";);
-        return publicRepoEvent;
-    }
-
-    private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() {
-        ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent();
-        artifactUpdatedEvent.setClusterId(CLUSTER_ID);
-        artifactUpdatedEvent.setTenantId(TENANT_ID);
-        return artifactUpdatedEvent;
+        pcaActivated = true;
+        log.info("PCA activation assertion passed.");
     }
 
     /**
@@ -287,9 +273,8 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
                 AUTOSCALING_POLICY_NAME, APP_ID);
         service.addCluster(cluster);
 
-        Member member = new Member(service.getServiceName(), 
cluster.getClusterId(), MEMBER_ID,
-                CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, 
LoadBalancingIPType.Private,
-                System.currentTimeMillis());
+        Member member = new Member(service.getServiceName(), 
cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
+                NETWORK_PARTITION_ID, PARTITION_ID, 
LoadBalancingIPType.Private, System.currentTimeMillis());
 
         member.setDefaultPrivateIP("10.0.0.1");
         member.setDefaultPublicIP("20.0.0.1");
@@ -298,7 +283,6 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
         member.setProperties(properties);
         member.setStatus(MemberStatus.Created);
         cluster.addMember(member);
-
         return topology;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1ed041dd/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index fb12db6..6e25b6b 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.security.AuthenticationUser;
 import org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.commons.exec.*;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +39,6 @@ import 
org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 import 
org.apache.stratos.python.cartridge.agent.integration.common.ThriftTestServer;
-import org.apache.commons.io.IOUtils;
 
 import java.io.*;
 import java.net.ServerSocket;
@@ -63,7 +63,7 @@ public class PythonAgentIntegrationTest {
 
     public static final String TEST_THREAD_POOL_SIZE = "test.thread.pool.size";
     protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID();
-//    protected final String defaultBrokerName = "testBrokerDefault";
+    //    protected final String defaultBrokerName = "testBrokerDefault";
     protected final Properties integrationProperties = new Properties();
 
     protected Map<Integer, ServerSocket> serverSocketMap = new HashMap<>();
@@ -76,7 +76,7 @@ public class PythonAgentIntegrationTest {
     protected String distributionName;
     protected int testThreadPoolSize;
 
-    protected boolean eventReceiverInitiated = false;
+    protected boolean eventReceiverInitialized = false;
     protected TopologyEventReceiver topologyEventReceiver;
     protected InstanceStatusEventReceiver instanceStatusEventReceiver;
     protected InitializerEventReceiver initializerEventReceiver;
@@ -87,7 +87,6 @@ public class PythonAgentIntegrationTest {
 
     private Map<String, BrokerService> messageBrokers;
 
-
     /**
      * Setup method for test method testPythonCartridgeAgent
      */
@@ -100,58 +99,58 @@ public class PythonAgentIntegrationTest {
         cepSSLPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT));
 
         Properties jndiProperties = new Properties();
-        jndiProperties.load(new FileInputStream(new 
File(System.getProperty("jndi.properties.dir") + PATH_SEP + 
"jndi.properties")));
-        if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || 
!jndiProperties.containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) {
+        jndiProperties.load(new FileInputStream(
+                new File(System.getProperty("jndi.properties.dir") + PATH_SEP 
+ "jndi.properties")));
+        if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || 
!jndiProperties
+                .containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) {
             amqpBindPorts = 
integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(",");
             mqttBindPorts = 
integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(",");
-        }else{
+        } else {
             amqpBindPorts = 
jndiProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(",");
             mqttBindPorts = 
jndiProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(",");
         }
 
         if (amqpBindPorts.length != mqttBindPorts.length) {
-            throw new RuntimeException("The number of AMQP ports and MQTT 
ports should be equal in integration-test.properties.");
+            throw new RuntimeException(
+                    "The number of AMQP ports and MQTT ports should be equal 
in integration-test.properties.");
         }
 
         // start ActiveMQ test server
-        for (int i = 0; i < amqpBindPorts.length; i++){
+        for (int i = 0; i < amqpBindPorts.length; i++) {
+            log.info("Starting ActiveMQ instance with AMQP: " + 
amqpBindPorts[i] + ", MQTT: " + mqttBindPorts[i]);
             startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), 
Integer.parseInt(mqttBindPorts[i]), true);
         }
 
-        if (!this.eventReceiverInitiated) {
-            ExecutorService executorService = 
StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
-            topologyEventReceiver = new TopologyEventReceiver();
-            topologyEventReceiver.setExecutorService(executorService);
-            topologyEventReceiver.execute();
+        ExecutorService executorService = 
StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
+        topologyEventReceiver = new TopologyEventReceiver();
+        topologyEventReceiver.setExecutorService(executorService);
+        topologyEventReceiver.execute();
 
-            instanceStatusEventReceiver = new InstanceStatusEventReceiver();
-            instanceStatusEventReceiver.setExecutorService(executorService);
-            instanceStatusEventReceiver.execute();
+        instanceStatusEventReceiver = new InstanceStatusEventReceiver();
+        instanceStatusEventReceiver.setExecutorService(executorService);
+        instanceStatusEventReceiver.execute();
 
-            this.instanceStarted = false;
-            instanceStatusEventReceiver.addEventListener(new 
InstanceStartedEventListener() {
-                @Override
-                protected void onEvent(Event event) {
-                    log.info("Instance started event received");
-                    instanceStarted = true;
-                }
-            });
+        instanceStatusEventReceiver.addEventListener(new 
InstanceStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                log.info("Instance started event received");
+                instanceStarted = true;
+            }
+        });
 
-            this.instanceActivated = false;
-            instanceStatusEventReceiver.addEventListener(new 
InstanceActivatedEventListener() {
-                @Override
-                protected void onEvent(Event event) {
-                    log.info("Instance activated event received");
-                    instanceActivated = true;
-                }
-            });
+        instanceStatusEventReceiver.addEventListener(new 
InstanceActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                log.info("Instance activated event received");
+                instanceActivated = true;
+            }
+        });
 
-            initializerEventReceiver = new InitializerEventReceiver();
-            initializerEventReceiver.setExecutorService(executorService);
-            initializerEventReceiver.execute();
+        initializerEventReceiver = new InitializerEventReceiver();
+        initializerEventReceiver.setExecutorService(executorService);
+        initializerEventReceiver.execute();
 
-            this.eventReceiverInitiated = true;
-        }
+        this.eventReceiverInitialized = true;
 
         // Start CEP Thrift test server
         thriftTestServer = new ThriftTestServer();
@@ -226,7 +225,7 @@ public class PythonAgentIntegrationTest {
         // stop the broker services
         for (Map.Entry<String, BrokerService> entry : 
this.messageBrokers.entrySet()) {
             try {
-                    log.debug("Stopping broker service [" + entry.getKey() + 
"]");
+                log.debug("Stopping broker service [" + entry.getKey() + "]");
                 entry.getValue().stop();
             } catch (Exception ignore) {
             }
@@ -240,8 +239,8 @@ public class PythonAgentIntegrationTest {
     }
 
     public PythonAgentIntegrationTest() throws IOException {
-        integrationProperties.load(
-                PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP 
+ "integration-test.properties"));
+        integrationProperties
+                
.load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + 
"integration-test.properties"));
         distributionName = 
integrationProperties.getProperty(DISTRIBUTION_NAME);
         cepPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_PORT));
         cepSSLPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT));
@@ -279,7 +278,7 @@ public class PythonAgentIntegrationTest {
             AuthenticationUser authenticationUser = new 
AuthenticationUser("system", "manager", "users,admins");
             List<AuthenticationUser> authUserList = new ArrayList<>();
             authUserList.add(authenticationUser);
-            broker.setPlugins(new BrokerPlugin[]{new 
SimpleAuthenticationPlugin(authUserList)});
+            broker.setPlugins(new BrokerPlugin[] { new 
SimpleAuthenticationPlugin(authUserList) });
         }
 
         broker.setBrokerName(brokerName);
@@ -293,8 +292,8 @@ public class PythonAgentIntegrationTest {
         return brokerName;
     }
 
-    protected void stopActiveMQInstance(String brokerName){
-        if (this.messageBrokers.containsKey(brokerName)){
+    protected void stopActiveMQInstance(String brokerName) {
+        if (this.messageBrokers.containsKey(brokerName)) {
             log.debug("Stopping broker service [" + brokerName + "]");
             BrokerService broker = this.messageBrokers.get(brokerName);
             try {
@@ -320,7 +319,7 @@ public class PythonAgentIntegrationTest {
                                     log.error("ERROR found in PCA log", e);
                                 }
                             }
-                            log.info("[PCA] " + line);
+                            log.debug("[PCA] " + line);
                         }
                     }
                     sleep(100);

Reply via email to