Tested with two nodes cluster successfully
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/8fe5546d Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/8fe5546d Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/8fe5546d Branch: refs/heads/ODE-563 Commit: 8fe5546d6528b1b2e970971af6c077077b871561 Parents: 348ae9d Author: suba <[email protected]> Authored: Wed Aug 5 22:39:34 2015 +0530 Committer: suba <[email protected]> Committed: Wed Aug 5 22:39:34 2015 +0530 ---------------------------------------------------------------------- .../src/main/webapp/WEB-INF/conf/hazelcast.xml | 63 ++++++++++++++++++++ .../java/org/apache/ode/axis2/ODEServer.java | 2 + .../hazelcast/HazelcastClusterImpl.java | 18 +++--- .../hazelcast/HazelcastDeploymentLock.java | 2 +- .../hazelcast/HazelcastInstanceLock.java | 3 +- .../ode/scheduler/simple/SimpleScheduler.java | 43 ++++++------- .../scheduler/simple/SimpleSchedulerTest.java | 27 ++++----- 7 files changed, 113 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml ---------------------------------------------------------------------- diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml new file mode 100644 index 0000000..bf1e99e --- /dev/null +++ b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.4.xsd" + xmlns="http://www.hazelcast.com/schema/config" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <network> + <port auto-increment="true" port-count="100">5701</port> + <outbound-ports> + <ports>0</ports> + </outbound-ports> + <reuse-address>false</reuse-address> + <join> + <multicast enabled="false"> + <multicast-group>224.2.2.3</multicast-group> + <multicast-port>54327</multicast-port> + </multicast> + <tcp-ip enabled="true"> + <member>127.0.0.1:5701</member> + <member>127.0.0.1:5702</member> + </tcp-ip> + <aws enabled="false"> + <access-key>my-access-key</access-key> + <secret-key>my-secret-key</secret-key> + <region>us-west-1</region> + <host-header>ec2.amazonaws.com</host-header> + <security-group-name>hazelcast-sg</security-group-name> + <tag-key>type</tag-key> + <tag-value>hz-nodes</tag-value> + <multicast enabled="false"> + <multicast-group>224.2.2.3</multicast-group> + <multicast-port>54327</multicast-port> + </multicast> + </aws> + </join> + <interfaces enabled="false"> + <interface>10.10.1.*</interface> + </interfaces> + <ssl enabled="false" /> + <socket-interceptor enabled="false" /> + </network> + <partition-group enabled="false"/> + <map name="ODE_DEPLOYMENT_LOCK"></map> + <map name="ODE_PROCESS_INSTANCE_LOCK"></map> + <topic name="ODE_DEPLOYMENT_TOPIC"></topic> +</hazelcast> + + + http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index 4860150..0a13c4a 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -201,6 +201,7 @@ public class ODEServer { _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler); _clusterManager.setClusterProcessStore((ClusterProcessStore) _store); _clusterManager.init(_configRoot); + ((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID()); } try { @@ -483,6 +484,7 @@ public class ODEServer { } } + /** * Initialize the DAO. * http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 9d2a554..4c5cad5 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -57,8 +57,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster private IMap<Long, Long> instance_lock_map; private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic; private ClusterProcessStore _clusterProcessStore; - private ClusterLock<String> _hazelcastDeploymentLock; - private ClusterLock<Long> _hazelcastInstanceLock; + private HazelcastDeploymentLock hazelcastDeploymentLock; + private HazelcastInstanceLock hazelcastInstanceLock; private ClusterDeploymentMessageListener clusterDeploymentMessageListener; private ClusterMemberShipListener clusterMemberShipListener; private List<ClusterMemberListener> clusterMemberListenerList = null; @@ -67,8 +67,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster clusterMemberShipListener = new ClusterMemberShipListener(); clusterDeploymentMessageListener = new ClusterDeploymentMessageListener(); clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this); + hazelcastDeploymentLock = new HazelcastDeploymentLock(); + hazelcastInstanceLock = new HazelcastInstanceLock(); } + public void init(File configRoot) { /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path. @@ -101,9 +104,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC); - _hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map); - _hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map); - + hazelcastDeploymentLock.setLockMap(deployment_lock_map); + hazelcastInstanceLock.setLockMap(instance_lock_map); markAsMaster(); } } @@ -221,7 +223,7 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster listener.memberElectedAsMaster(nodeID); } } - __log.info(isMaster); + __log.info("Master node: " +isMaster); } public boolean isMaster() { @@ -249,11 +251,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster } public ClusterLock<String> getDeploymentLock(){ - return _hazelcastDeploymentLock; + return (ClusterLock)hazelcastDeploymentLock; } public ClusterLock<Long> getInstanceLock(){ - return _hazelcastInstanceLock; + return (ClusterLock)hazelcastInstanceLock; } public List<String> getActiveNodes() { http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java index f36a1b4..b753305 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java @@ -31,7 +31,7 @@ public class HazelcastDeploymentLock implements ClusterLock<String>{ private IMap<String, String> _lock_map; - HazelcastDeploymentLock(IMap<String, String> lock_map) { + public void setLockMap(IMap<String, String> lock_map) { _lock_map = lock_map; } http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java index 1729bac..8ac11f8 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java @@ -31,8 +31,7 @@ public class HazelcastInstanceLock implements ClusterLock<Long> { private IMap<Long, Long> _lock_map; - - HazelcastInstanceLock(IMap<Long, Long> lock_map) { + public void setLockMap(IMap<Long, Long> lock_map) { _lock_map = lock_map; } http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index 1da5571..517045d 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -479,10 +479,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList // schedule immediate job loading for now! _todo.enqueue(new LoadImmediateTask(now)); - if(!_isClusterEnabled) enqueueTasksReadnodeIds(); + if(!_isClusterEnabled) enqueueTasksReadnodeIds(now); else { - if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(); + if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(now); } _todo.start(); @@ -521,10 +521,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified. public void memberElectedAsMaster(String masterId) { - enqueueTasksReadnodeIds(); + long now = System.currentTimeMillis(); + enqueueTasksReadnodeIds(now); } - private void enqueueTasksReadnodeIds() { + private void enqueueTasksReadnodeIds(long now) { try { execTransaction(new Callable<Void>() { @@ -544,8 +545,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList else _knownNodes.add(_nodeId); - long now = System.currentTimeMillis(); - // schedule check for stale nodes, make it random so that the nodes don't overlap. _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); @@ -815,8 +814,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList final ArrayList<String> activeNodes; // for cluster mode - if (_isClusterEnabled && _clusterManager.isMaster()) { - activeNodes = (ArrayList) _clusterManager.getActiveNodes(); + if (_isClusterEnabled) { + if (_clusterManager.isMaster()) { + activeNodes = (ArrayList) _clusterManager.getActiveNodes(); + } else activeNodes = null; } //for standalone ODE deployments else { @@ -984,24 +985,26 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes); // for cluster mode - if (_isClusterEnabled && _clusterManager.isMaster()) { - ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes(); - - //find stale nodes - knownNodes.removeAll(memberList); - if (knownNodes.size() != 0) { - for (String nodeId : knownNodes) { - _staleNodes.add(nodeId); + if (_isClusterEnabled) { + if (_clusterManager.isMaster()) { + ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes(); + + //find stale nodes + knownNodes.removeAll(memberList); + if (knownNodes.size() != 0) { + for (String nodeId : knownNodes) { + _staleNodes.add(nodeId); + } + } + for (String nodeId : _staleNodes) { + recoverStaleNode(nodeId); } - } - for (String nodeId : _staleNodes) { - recoverStaleNode(nodeId); } } // for standalone ode node else { for (String nodeId : knownNodes) { - if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId); + if (!_nodeId.equals(nodeId)) recoverStaleNode(nodeId); } } /*for (String nodeId : _knownNodes) { http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java index 4c89ae9..10e86fc 100644 --- a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java +++ b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java @@ -19,27 +19,26 @@ package org.apache.ode.scheduler.simple; -import java.util.*; - -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.Synchronization; -import javax.transaction.SystemException; -import javax.transaction.TransactionManager; - import junit.framework.Assert; -import junit.framework.TestCase; - import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessor; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; -import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; + +import java.util.ArrayList; +import java.util.Date; +import java.util.Properties; + public class SimpleSchedulerTest extends Assert implements JobProcessor { DelegateSupport _ds; @@ -210,10 +209,10 @@ public class SimpleSchedulerTest extends Assert implements JobProcessor { _scheduler.setImmediateInterval(1000); _scheduler.setStaleInterval(1000); _scheduler.start(); - for (int i = 0; i < 40; ++i) { - _scheduler.updateHeartBeat("n1"); + /*for (int i = 0; i < 40; ++i) { + _scheduler.updateHeartBeat("n1"); Thread.sleep(100); - } + }*/ _scheduler.stop(); Thread.sleep(1000);
