redesigning phase
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/afa36ee6 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/afa36ee6 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/afa36ee6 Branch: refs/heads/master Commit: afa36ee682af1ec4cf4441c44007f2ef6b564b04 Parents: b4cd9a4 Author: suba <[email protected]> Authored: Mon Jun 15 22:53:53 2015 +0530 Committer: suba <[email protected]> Committed: Mon Jun 15 22:53:53 2015 +0530 ---------------------------------------------------------------------- .../java/org/apache/ode/axis2/ODEServer.java | 31 ++++------ .../apache/ode/bpel/clapi/ClusterManager.java | 59 ++++++++++++++++++++ .../apache/ode/bpel/hzapi/HazelcastCluster.java | 56 ------------------- .../ode/il/config/OdeConfigProperties.java | 12 +++- .../apache/ode/bpel/engine/BpelServerImpl.java | 6 +- .../org/apache/ode/bpel/engine/Contexts.java | 4 +- .../ode/store/ClusterProcessStoreImpl.java | 25 +++++++-- .../hazelcast/HazelcastClusterImpl.java | 56 ++++++++----------- 8 files changed, 129 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index d1f6c36..26489d2 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -48,7 +48,6 @@ import javax.transaction.xa.XAResource; import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.engine.AxisConfiguration; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread; import org.apache.commons.httpclient.params.HttpConnectionManagerParams; @@ -83,8 +82,7 @@ import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; -import org.apache.ode.clustering.hazelcast.*; -import com.hazelcast.core.*; +import org.apache.ode.bpel.clapi.ClusterManager; /** * Server class called by our Axis hooks to handle all ODE lifecycle management. @@ -137,9 +135,7 @@ public class ODEServer { public Runnable txMgrCreatedCallback; - private HazelcastInstanceConfig hazelcastInstanceConfig; - - private HazelcastClusterImpl hazelcastClusterImpl; + private ClusterManager _clusterManager; private String clusteringState = ""; @@ -484,7 +480,7 @@ public class ODEServer { isClusteringEnabled = state; } - public boolean getClusteringState() { + public boolean getIsCluteringEnabled() { return isClusteringEnabled; } @@ -492,17 +488,14 @@ public class ODEServer { * Initialize the clustering if it is enabled */ private void initClustering() { - String hzConfig = System.getProperty("hazelcast.config"); - if (hzConfig != null) hazelcastInstanceConfig = new HazelcastInstanceConfig(); - else { - File hzXml = new File(_configRoot, "hazelcast.xml"); - if (!hzXml.isFile()) - __log.error("hazelcast.xml does not exist or is not a file"); - else hazelcastInstanceConfig = new HazelcastInstanceConfig(hzXml); - } - if (hazelcastInstanceConfig != null) { - hazelcastClusterImpl = new HazelcastClusterImpl(hazelcastInstanceConfig.getHazelcastInstance()); + String clusterImplName = _odeConfig.getClusteringImplClass(); + try { + Class<?> clustering_class = this.getClass().getClassLoader().loadClass(clusterImplName); + _clusterManager = (ClusterManager) clustering_class.newInstance(); + } catch (Exception ex) { + __log.error(ex); } + _clusterManager.init(_configRoot); } /** @@ -534,7 +527,7 @@ public class ODEServer { protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) { if (isClusteringEnabled) - return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, hazelcastClusterImpl); + return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager); else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false); } @@ -585,7 +578,7 @@ public class ODEServer { _bpelServer.setCronScheduler(_cronScheduler); _bpelServer.setDaoConnectionFactory(_daoCF); - _bpelServer.setHazelcastCluster(hazelcastClusterImpl); + _bpelServer.setClusterManagerImpl(_clusterManager); _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl())); _bpelServer.setEndpointReferenceContext(eprContext); _bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this)); http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java new file mode 100644 index 0000000..4a0aded --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.bpel.clapi; + +import java.io.File; +import java.util.List; + +public interface ClusterManager { + + /** + * Initialization of the cluster + * @param file + */ + void init(File file); + + /** + * Check whether current node is the leader or not. + */ + void markAsMaster(); + + /** + * Return isMaster + * @return + */ + boolean getIsMaster(); + + /** + * Acquire the lock for each file in the file system + * @param key + * @return + */ + boolean lock(String key); + + /** + * Release the lock acquired by each file + * @param key + * @return + */ + boolean unlock(String key); + + + +} http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java b/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java deleted file mode 100644 index adca32c..0000000 --- a/bpel-api/src/main/java/org/apache/ode/bpel/hzapi/HazelcastCluster.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ode.bpel.hzapi; - -import com.hazelcast.core.Member; - -import java.util.List; - -public interface HazelcastCluster { - - /** - * Initialization of the cluster - */ - void init(); - - /** - * Get hostName + port nu of Member - * @param member - * @return - */ - String getHazelCastNodeID(Member member); - - /** - * Check whether current node is the leader or not. - */ - void markAsMaster(); - - /** - * returns Current Nodes in the cluster. - * @return - */ - List<String> getKnownNodes(); - - /** - * Return isMaster - * @return - */ - boolean getIsMaster(); - -} http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java ---------------------------------------------------------------------- diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java index cef3b74..5c0ed13 100644 --- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java +++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java @@ -106,7 +106,11 @@ public class OdeConfigProperties { public static final String DEFAULT_TX_FACTORY_CLASS_NAME = "org.apache.ode.il.EmbeddedGeronimoFactory"; - public static final String PROP_HAZELCAST_CLUSTERING = "hazelcast.clustering.enabled"; + public static final String PROP_CLUSTERING_ENABLED = "clustering.enabled"; + + public static final String PROP_CLUSTERING_IMPL_CLASS = "clustering.impl.class"; + + public static final String DEFAULT_CLUSTERING_IMPL_CLASS_NAME = "org.apache.ode.clustering.hazelcast.HazelcastClusterImpl"; private File _cfgFile; @@ -292,7 +296,11 @@ public class OdeConfigProperties { } public String getClusteringState() { - return getProperty(OdeConfigProperties.PROP_HAZELCAST_CLUSTERING); + return getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED); + } + + public String getClusteringImplClass() { + return getProperty(OdeConfigProperties.PROP_CLUSTERING_IMPL_CLASS, DEFAULT_CLUSTERING_IMPL_CLASS_NAME); } public String getTxFactoryClass() { http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java index 92e9784..01ef6eb 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java @@ -35,6 +35,7 @@ import javax.xml.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.dao.BpelDAOConnection; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable; @@ -59,7 +60,6 @@ import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable; import org.apache.ode.bpel.iapi.Scheduler.Synchronizer; import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; import org.apache.ode.bpel.o.OProcess; -import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl; import org.apache.ode.utils.msg.MessageBundle; import org.apache.ode.utils.stl.CollectionsX; import org.apache.ode.utils.stl.MemberOfFunction; @@ -535,8 +535,8 @@ public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor { _contexts.bindingContext = bc; } - public void setHazelcastCluster(HazelcastClusterImpl hzCImpl) { - _contexts.hazelcastClusterImpl = hzCImpl; + public void setClusterManagerImpl(ClusterManager cm) { + _contexts.clusterManager = cm; } public DebuggerContext getDebugger(QName pid) throws BpelEngineException { http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java index 115f4f7..a965d58 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java @@ -19,6 +19,7 @@ package org.apache.ode.bpel.engine; +import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.iapi.BindingContext; import org.apache.ode.bpel.iapi.BpelEventListener; @@ -28,7 +29,6 @@ import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; import org.apache.ode.bpel.engine.cron.CronScheduler; import org.apache.ode.bpel.evar.ExternalVariableModule; -import org.apache.ode.clustering.hazelcast.HazelcastClusterImpl; import java.util.HashMap; import java.util.List; @@ -47,7 +47,7 @@ public class Contexts { public CronScheduler cronScheduler; - public HazelcastClusterImpl hazelcastClusterImpl; + public ClusterManager clusterManager; EndpointReferenceContext eprContext; http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index 7f79a8b..22ba2cd 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -30,6 +30,8 @@ import javax.sql.DataSource; import javax.xml.namespace.QName; import java.io.File; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.hazelcast.core.*; @@ -66,15 +68,16 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ clusterMessageTopic.publish("Deployed " +duName); } - //have to write code for retire previous versions public void publishService(final String duName) { - final ArrayList<ProcessConfImpl> confs = new ArrayList<ProcessConfImpl>(); - String namePart = duName.split("-")[0]; + final ArrayList<ProcessConfImpl> confs = new ArrayList<ProcessConfImpl>();; ProcessState state = ProcessState.ACTIVE; + Pattern duNamePattern = getPreviousPackageVersionPattern(duName); + for (Iterator<ProcessConfImpl> iterator = loaded.iterator(); iterator.hasNext();) { ProcessConfImpl pconf = iterator.next(); - if (pconf.getPackage().contains(namePart) && pconf.getState().equals(state)) { + Matcher matcher = duNamePattern.matcher(pconf.getPackage()); + if (matcher.matches() && pconf.getState().equals(state)) { pconf.setState(ProcessState.RETIRED); confs.add(pconf); } @@ -122,4 +125,18 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ } } } + + private Pattern getPreviousPackageVersionPattern(String duName) { + String[] nameParts = duName.split("/"); + /* Replace the version number (if any) with regexp to match any version number */ + nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", ""); + nameParts[0] += "([-\\Q.\\E](\\d)+)?"; + StringBuilder duNameRegExp = new StringBuilder(duName.length() * 2); + for (int i = 0, n = nameParts.length; i < n; i++) { + if (i > 0) duNameRegExp.append("/"); + duNameRegExp.append(nameParts[i]); + } + Pattern duNamePattern = Pattern.compile(duNameRegExp.toString()); + return duNamePattern; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/afa36ee6/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 4e6878e..6ae701b 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -20,18 +20,19 @@ package org.apache.ode.clustering.hazelcast; import com.hazelcast.core.*; +import java.io.File; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.hzapi.HazelcastCluster; +import org.apache.ode.bpel.clapi.ClusterManager; /** * This class implements necessary methods to build the cluster using hazelcast */ -public class HazelcastClusterImpl implements HazelcastCluster{ +public class HazelcastClusterImpl implements ClusterManager{ private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class); private HazelcastInstance _hazelcastInstance; @@ -40,28 +41,26 @@ public class HazelcastClusterImpl implements HazelcastCluster{ private IMap<String, String> lock_map; - public HazelcastClusterImpl(HazelcastInstance hazelcastInstance) { - _hazelcastInstance = hazelcastInstance; - init(); - } - - public void init() { - // Registering this node in the cluster. - _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); - - Member localMember = _hazelcastInstance.getCluster().getLocalMember(); - String localMemberID = getHazelCastNodeID(localMember); - __log.info("Registering HZ localMember ID " + localMemberID); - _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP) - .put(localMemberID, isMaster); - - lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_LOCK_MAP); - } + public void init(File configRoot) { + //First,looks for the hazelcast.config system property. If it is set, its value is used as the path. + //Else it will load the hazelcast.xml file using FileSystemXmlConfig() + String hzConfig = System.getProperty("hazelcast.config"); + if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance(); + else { + File hzXml = new File(configRoot, "hazelcast.xml"); + if (!hzXml.isFile()) + __log.error("hazelcast.xml does not exist or is not a file"); + else _hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml)); + } - public String getHazelCastNodeID(Member member) { - String hostName = member.getSocketAddress().getHostName(); - int port = member.getSocketAddress().getPort(); - return hostName + ":" + port; + if (_hazelcastInstance != null) { + // Registering this node in the cluster. + _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); + Member localMember = _hazelcastInstance.getCluster().getLocalMember(); + __log.info("Registering HZ localMember ID " + localMember); + markAsMaster(); + lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_LOCK_MAP); + } } public boolean lock(String key) { @@ -83,7 +82,6 @@ public class HazelcastClusterImpl implements HazelcastCluster{ } class ClusterMemberShipListener implements MembershipListener { - @Override public void memberAdded(MembershipEvent membershipEvent) { // Noting to do here. @@ -95,8 +93,6 @@ public class HazelcastClusterImpl implements HazelcastCluster{ // Allow Leader to update distributed map. if (isMaster) { String leftMemberID = getHazelCastNodeID(membershipEvent.getMember()); - // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).remove(leftMemberID); - // _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).replace(getHazelCastNodeID(leader), isMaster); } } @@ -114,14 +110,6 @@ public class HazelcastClusterImpl implements HazelcastCluster{ __log.info(isMaster); } - public List<String> getKnownNodes() { - List<String> nodeList = new ArrayList<String>(); - for (Object s : _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).keySet()) { - nodeList.add((String) _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_NODE_MAP).get(s)); - } - return nodeList; - } - public boolean getIsMaster() { return isMaster; }
