Cluster Enabled Simple Scheduler-1
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/15f1883c Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/15f1883c Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/15f1883c Branch: refs/heads/ODE-563 Commit: 15f1883c40e845b430c5c40418333ccccfa48b6a Parents: 9ffe0c7 Author: suba <[email protected]> Authored: Sun Jul 19 00:41:15 2015 +0530 Committer: suba <[email protected]> Committed: Sun Jul 19 00:41:15 2015 +0530 ---------------------------------------------------------------------- .../java/org/apache/ode/axis2/ODEServer.java | 12 +- .../org/apache/ode/test/BPELTestAbstract.java | 53 ++++---- .../hazelcast/HazelcastClusterImpl.java | 29 ++++- .../java/org/apache/ode/jbi/OdeLifeCycle.java | 25 ++-- .../ode/scheduler/simple/SchedulerListener.java | 27 ++++ .../ode/scheduler/simple/SimpleScheduler.java | 123 +++++++++++++++---- 6 files changed, 191 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index da62139..6803350 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -50,6 +50,7 @@ import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.store.ProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; +import org.omg.CORBA.StringHolder; import javax.servlet.ServletConfig; import javax.servlet.ServletException; @@ -195,7 +196,10 @@ public class ODEServer { registerExternalVariableModules(); _store.loadAll(); - if (_clusterManager != null) _clusterManager.registerClusterProcessStoreMessageListener(); + if (_clusterManager != null) { + _clusterManager.registerClusterProcessStoreMessageListener(); + _clusterManager.setScheduler(_scheduler); + } try { _bpelServer.start(); @@ -524,8 +528,10 @@ public class ODEServer { } protected Scheduler createScheduler() { - SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), - new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties()); + String nodeId; + if (isClusteringEnabled) nodeId = _clusterManager.getUuid(); + else nodeId = new GUID().toString(); + SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled); scheduler.setExecutorService(_executorService); scheduler.setTransactionManager(_txMgr); return scheduler; http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java ---------------------------------------------------------------------- diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java index d24b59b..cdda50e 100644 --- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java +++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java @@ -18,44 +18,14 @@ */ package org.apache.ode.test; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.URISyntaxException; -import java.net.URL; -import java.sql.Connection; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.sql.DataSource; -import javax.transaction.TransactionManager; -import javax.xml.namespace.QName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.common.evt.DebugBpelEventListener; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.engine.BpelServerImpl; -import org.apache.ode.bpel.iapi.Message; -import org.apache.ode.bpel.iapi.MessageExchange; +import org.apache.ode.bpel.iapi.*; import org.apache.ode.bpel.iapi.MessageExchange.Status; -import org.apache.ode.bpel.iapi.MyRoleMessageExchange; import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus; -import org.apache.ode.bpel.iapi.ProcessStore; -import org.apache.ode.bpel.iapi.ProcessStoreEvent; -import org.apache.ode.bpel.iapi.ProcessStoreListener; import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; import org.apache.ode.il.EmbeddedGeronimoFactory; import org.apache.ode.il.config.OdeConfigProperties; @@ -71,6 +41,25 @@ import org.junit.Assert; import org.junit.Before; import org.w3c.dom.Element; +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.sql.DataSource; +import javax.transaction.TransactionManager; +import javax.xml.namespace.QName; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public abstract class BPELTestAbstract { private static final Log log = LogFactory.getLog(BPELTestAbstract.class); public static final long WAIT_BEFORE_INVOKE_TIMEOUT = 2000; @@ -139,7 +128,7 @@ public abstract class BPELTestAbstract { { JdbcDelegate del = new JdbcDelegate(_dataSource); - scheduler = new SimpleScheduler("node", del, props); + scheduler = new SimpleScheduler("node", del, props,false); scheduler.setTransactionManager(_txManager); _cf = new BpelDAOConnectionFactoryImpl(scheduler); _server.setDaoConnectionFactory(_cf); http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 5f2b8f5..63a889a 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.*; +import org.apache.ode.bpel.iapi.Scheduler; /** * This class implements necessary methods to build the cluster using hazelcast @@ -46,6 +47,7 @@ public class HazelcastClusterImpl implements ClusterManager { private IMap<Long, Long> instance_lock_map; private ITopic<ProcessStoreClusterEvent> clusterMessageTopic; private ClusterProcessStore _clusterProcessStore; + private Scheduler _scheduler; private ClusterLock<String> _hazelcastDeploymentLock; private ClusterLock<Long> _hazelcastInstanceLock; @@ -89,13 +91,17 @@ public class HazelcastClusterImpl implements ClusterManager { class ClusterMemberShipListener implements MembershipListener { @Override public void memberAdded(MembershipEvent membershipEvent) { - __log.info("Member Added " +membershipEvent.getMember().getUuid()); + String nodeId = membershipEvent.getMember().getUuid(); + __log.info("Member Added " +nodeId); + if(isMaster) _simpleScheduler.memberAdded(nodeId); } @Override public void memberRemoved(MembershipEvent membershipEvent) { - __log.info("Member Removed " +membershipEvent.getMember().getUuid()); + String nodeId = membershipEvent.getMember().getUuid(); + __log.info("Member Removed " +nodeId); markAsMaster(); + if(isMaster) _simpleScheduler.memberRemoved(nodeId, uuid); } @Override @@ -145,6 +151,7 @@ public class HazelcastClusterImpl implements ClusterManager { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember()) { isMaster = true; + _simpleScheduler.setIsMasterNode(true); } __log.info(isMaster); } @@ -153,8 +160,17 @@ public class HazelcastClusterImpl implements ClusterManager { return isMaster; } + public String getUuid() { + return uuid; + } + public void setClusterProcessStore(ClusterProcessStore store) { - _clusterProcessStore = store; + _clusterProcessStore = store; + } + + public void setScheduler(Scheduler scheduler) { + _scheduler = scheduler; + _scheduler.setClusterManager(this); } public void registerClusterProcessStoreMessageListener() { @@ -172,5 +188,12 @@ public class HazelcastClusterImpl implements ClusterManager { public ClusterLock<Long> getInstanceLock(){ return _hazelcastInstanceLock; } + + public List<String> getKnownNodes() { + List<String> nodesList = new ArrayList<String>(); + for(Member m : _hazelcastInstance.getCluster().getMembers()) + nodesList.add(m.getUuid()) ; + return nodesList; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java ---------------------------------------------------------------------- diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java index 40fb044..0c1b296 100644 --- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java +++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java @@ -19,18 +19,6 @@ package org.apache.ode.jbi; -import java.io.File; -import java.io.FileNotFoundException; -import java.util.concurrent.Executors; - -import javax.jbi.JBIException; -import javax.jbi.component.ComponentContext; -import javax.jbi.component.ComponentLifeCycle; -import javax.jbi.component.ServiceUnitManager; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.transaction.TransactionManager; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; @@ -50,6 +38,17 @@ import org.apache.ode.store.ProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; +import javax.jbi.JBIException; +import javax.jbi.component.ComponentContext; +import javax.jbi.component.ComponentLifeCycle; +import javax.jbi.component.ServiceUnitManager; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.transaction.TransactionManager; +import java.io.File; +import java.io.FileNotFoundException; +import java.util.concurrent.Executors; + /** * This class implements ComponentLifeCycle. The JBI framework will start this engine class automatically when JBI framework starts * up. @@ -243,7 +242,7 @@ public class OdeLifeCycle implements ComponentLifeCycle { _ode._executorService = Executors.newCachedThreadPool(); else _ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize()); - _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties()); + _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false); _ode._scheduler.setJobProcessor(_ode._server); _ode._scheduler.setExecutorService(_ode._executorService); _ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager()); http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java new file mode 100644 index 0000000..3786912 --- /dev/null +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.scheduler.simple; + +public interface SchedulerListener { + + void memberAdded(String nodeId); + + void memberRemoved(String nodeId,String masterId); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/15f1883c/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index a56b86e..3b6ec4d 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -19,33 +19,19 @@ package org.apache.ode.scheduler.simple; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; - -import javax.transaction.Status; -import javax.transaction.Synchronization; -import javax.transaction.SystemException; -import javax.transaction.Transaction; -import javax.transaction.TransactionManager; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Scheduler; +import javax.transaction.*; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + /** * A reliable and relatively simple scheduler that uses a database to persist information about * scheduled tasks. @@ -66,7 +52,7 @@ import org.apache.ode.bpel.iapi.Scheduler; * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) * */ -public class SimpleScheduler implements Scheduler, TaskRunner { +public class SimpleScheduler implements Scheduler, TaskRunner, SchedulerListener { private static final Log __log = LogFactory.getLog(SimpleScheduler.class); private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000; @@ -114,6 +100,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner { private DatabaseDelegate _db; + private boolean _isClusterEnabled; + + private ClusterManager _clusterManager; + /** All the nodes we know about */ private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>(); @@ -147,9 +137,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner { private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS"); - public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) { + public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) { _nodeId = nodeId; _db = del; + _isClusterEnabled = clusterState; _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit); _immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval); _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval); @@ -183,6 +174,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner { _nodeId = nodeId; } + public void setClusterManager(ClusterManager cm) { + _clusterManager = cm; + } + public void setStaleInterval(long staleInterval) { _staleInterval = staleInterval; } @@ -490,10 +485,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner { _todo.enqueue(new LoadImmediateTask(now)); // schedule check for stale nodes, make it random so that the nodes don't overlap. - _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); + if (!_isClusterEnabled) + _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval))); // do the upgrade sometime (random) in the immediate interval. - _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); + enqueUpgradeJobsTask(now); _todo.start(); _running = true; @@ -521,6 +517,18 @@ public class SimpleScheduler implements Scheduler, TaskRunner { _running = false; } + public void memberAdded(final String nodeId) { + _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis()+ randomMean(_immediateInterval))); + } + + public void memberRemoved(final String nodeId, final String masterId) { + recoverClusterStaleNodes(nodeId, masterId); + } + + public void enqueUpgradeJobsTask(long now) { + _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval))); + } + class RunJob implements Callable<Void> { final Job job; final JobProcessor processor; @@ -814,6 +822,41 @@ public class SimpleScheduler implements Scheduler, TaskRunner { } + boolean doClusterJobsUpgrade() { + __log.debug("UPGRADE started for Cluster Mode"); + final ArrayList<String> knownNodes = _clusterManager.getKnownNodes(); + Collections.sort(knownNodes); + + // We're going to try to upgrade near future jobs using the db only. + // We assume that the distribution of the trailing digits in the + // scheduled time are uniformly distributed, and use modular division + // of the time by the number of nodes to create the node assignment. + // This can be done in a single update statement. + final long maxtime = System.currentTimeMillis() + _nearFutureInterval; + try { + return execTransaction(new Callable<Boolean>() { + + public Boolean call() throws Exception { + int numNodes = knownNodes.size(); + for (int i = 0; i < numNodes; ++i) { + String node = knownNodes.get(i); + _db.updateAssignToNode(node, i, numNodes, maxtime); + } + return true; + } + + }); + + } catch (Exception ex) { + __log.error("Database error upgrading jobs.", ex); + return false; + } finally { + __log.debug("UPGRADE complete"); + } + + } + + /** * Re-assign stale node's jobs to self. * @param nodeId @@ -857,6 +900,31 @@ public class SimpleScheduler implements Scheduler, TaskRunner { // return delay; // } + void recoverClusterStaleNodes(final String nodeId, final String masterId) { + if (__log.isDebugEnabled()) { + __log.debug("recovering stale nodes for Cluster Mode " + nodeId); + } + try { + int numrows = execTransaction(new Callable<Integer>() { + public Integer call() throws Exception { + return _db.updateReassign(nodeId, masterId); + } + }); + + if (__log.isDebugEnabled()) { + __log.debug("reassigned " + numrows + " jobs to master node. "); + } + + // Force a load-immediate to catch anything new from the recovered node. + doLoadImmediate(); + + } catch (Exception ex) { + __log.error("Database error reassigning node.", ex); + } finally { + __log.debug("node recovery complete"); + } + } + private abstract class SchedulerTask extends Task implements Runnable { SchedulerTask(long schedDate) { super(schedDate); @@ -911,7 +979,8 @@ public class SimpleScheduler implements Scheduler, TaskRunner { boolean success = false; try { - success = doUpgrade(); + if (_isClusterEnabled && _clusterManager.getIsMaster()) success = doClusterJobsUpgrade(); + else success = doUpgrade(); } finally { long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000); _nextUpgrade.set(future);
