Author: rajdavies
Date: Thu Aug 6 06:15:52 2009
New Revision: 801515
URL: http://svn.apache.org/viewvc?rev=801515&view=rev
Log:
tidied up a little
Modified:
activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
Modified:
activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java?rev=801515&r1=801514&r2=801515&view=diff
==============================================================================
---
activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
(original)
+++
activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
Thu Aug 6 06:15:52 2009
@@ -66,38 +66,31 @@
/**
* <P>
- * A <CODE>Group</CODE> is a distributed collaboration implementation that is
- * used to shared state and process messages amongst a distributed group of
- * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * A <CODE>Group</CODE> is a distributed collaboration implementation that is
used to shared state and process
+ * messages amongst a distributed group of other <CODE>Group</CODE> instances.
Membership of a group is handled
* automatically using discovery.
* <P>
- * The underlying transport is JMS and there are some optimizations that occur
- * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
- * with any JMS implementation.
+ * The underlying transport is JMS and there are some optimizations that occur
for membership if used with ActiveMQ -
+ * but <CODE>Group</CODE> can be used with any JMS implementation.
*
* <P>
- * Updates to the group shared map are controlled by a coordinator. The
- * coordinator is elected by the member with the lowest lexicographical id -
- * based on the bully algorithm [Silberschatz et al. 1993]
+ * Updates to the group shared map are controlled by a coordinator. The
coordinator is elected by the member with the
+ * lowest lexicographical id - based on the bully algorithm [Silberschatz et
al. 1993]
* <P>
- * The {...@link #selectCordinator(Collection<Member> members)} method may be
- * overridden to implement a custom mechanism for choosing how the coordinator
- * is elected for the map.
+ * The {...@link #selectCordinator(Collection<Member> members)} method may be
overridden to implement a custom mechanism
+ * for choosing how the coordinator is elected for the map.
* <P>
- * New <CODE>Group</CODE> instances have their state updated by the
- * coordinator, and coordinator failure is handled automatically within the
- * group.
+ * New <CODE>Group</CODE> instances have their state updated by the
coordinator, and coordinator failure is handled
+ * automatically within the group.
* <P>
- * All map updates are totally ordered through the coordinator, whilst read
- * operations happen locally.
+ * All map updates are totally ordered through the coordinator, whilst read
operations happen locally.
* <P>
- * A <CODE>Group</CODE> supports the concept of owner only updates(write
- * locks), shared updates, entry expiration times and removal on owner exit -
- * all of which are optional. In addition, you can grab and release locks for
- * values in the map, independently of who created them.
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write
locks), shared updates, entry expiration
+ * times and removal on owner exit - all of which are optional. In addition,
you can grab and release locks for values
+ * in the map, independently of who created them.
* <P>
- * In addition, members of a group can broadcast messages and implement
- * request/response with other <CODE>Group</CODE> instances.
+ * In addition, members of a group can broadcast messages and implement
request/response with other <CODE>Group</CODE>
+ * instances.
*
* <P>
*
@@ -114,10 +107,8 @@
public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000;
private static final long EXPIRATION_SWEEP_INTERVAL = 500;
private static final Log LOG = LogFactory.getLog(Group.class);
- private static final String STATE_PREFIX = "STATE." + Group.class.getName()
- + ".";
- private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."
- + Group.class.getName() + ".";
+ private static final String STATE_PREFIX = "STATE." +
Group.class.getName() + ".";
+ private static final String GROUP_MESSAGE_PREFIX = "MESSAGE." +
Group.class.getName() + ".";
private static final String STATE_TYPE = "state";
private static final String MESSAGE_TYPE = "message";
private static final String MEMBER_ID_PROPERTY = "memberId";
@@ -184,8 +175,7 @@
}
/**
- * Set the local map implementation to be used By default its a HashMap -
- * but you could use a Cache for example
+ * Set the local map implementation to be used By default its a HashMap -
but you could use a Cache for example
*
* @param map
*/
@@ -209,31 +199,22 @@
}
}
this.connection.start();
- this.stateSession = this.connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- this.messageSession = this.connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
+ this.stateSession = this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ this.messageSession = this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
this.stateProducer = this.stateSession.createProducer(null);
this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.inboxTopic = this.stateSession.createTemporaryTopic();
String stateTopicName = STATE_PREFIX + this.groupName;
this.stateTopic = this.stateSession.createTopic(stateTopicName);
- this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
- + ".heartbeat");
- String messageDestinationName = GROUP_MESSAGE_PREFIX
- + this.groupName;
- this.messageTopic = this.messageSession
- .createTopic(messageDestinationName);
- this.messageQueue = this.messageSession
- .createQueue(messageDestinationName);
- MessageConsumer privateInbox = this.messageSession
- .createConsumer(this.inboxTopic);
- MessageConsumer memberChangeConsumer = this.stateSession
- .createConsumer(this.stateTopic);
+ this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
+ ".heartbeat");
+ String messageDestinationName = GROUP_MESSAGE_PREFIX +
this.groupName;
+ this.messageTopic =
this.messageSession.createTopic(messageDestinationName);
+ this.messageQueue =
this.messageSession.createQueue(messageDestinationName);
+ MessageConsumer privateInbox =
this.messageSession.createConsumer(this.inboxTopic);
+ MessageConsumer memberChangeConsumer =
this.stateSession.createConsumer(this.stateTopic);
String memberId = null;
if (memberChangeConsumer instanceof ActiveMQMessageConsumer) {
- memberId = ((ActiveMQMessageConsumer) memberChangeConsumer)
- .getConsumerId().toString();
+ memberId = ((ActiveMQMessageConsumer)
memberChangeConsumer).getConsumerId().toString();
} else {
memberId = this.idGenerator.generateId();
}
@@ -252,63 +233,53 @@
});
this.messageProducer = this.messageSession.createProducer(null);
this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- MessageConsumer topicMessageConsumer = this.messageSession
- .createConsumer(this.messageTopic);
+ MessageConsumer topicMessageConsumer =
this.messageSession.createConsumer(this.messageTopic);
topicMessageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
processJMSMessage(message);
}
});
- MessageConsumer queueMessageConsumer = this.messageSession
- .createConsumer(this.messageQueue);
+ MessageConsumer queueMessageConsumer =
this.messageSession.createConsumer(this.messageQueue);
queueMessageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
processJMSMessage(message);
}
});
- MessageConsumer heartBeatConsumer = this.stateSession
- .createConsumer(this.heartBeatTopic);
+ MessageConsumer heartBeatConsumer =
this.stateSession.createConsumer(this.heartBeatTopic);
heartBeatConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
handleHeartbeats(message);
}
});
- this.consumerEvents = new ConsumerEventSource(this.connection,
- this.stateTopic);
+ this.consumerEvents = new ConsumerEventSource(this.connection,
this.stateTopic);
this.consumerEvents.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
handleConsumerEvents(event);
}
});
this.consumerEvents.start();
- this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
+ this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Election{"
- + Group.this.local + "}");
- thread.setDaemon(true);
- return thread;
- }
- });
- this.stateExecutor = Executors
- .newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Group State{"
- + Group.this.local + "}");
- thread.setDaemon(true);
- return thread;
- }
- });
- this.messageExecutor = Executors
- .newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable,
- "Group Messages{" + Group.this.local +
"}");
+ Thread thread = new Thread(runnable, "Election{" +
Group.this.local + "}");
thread.setDaemon(true);
return thread;
}
});
+ this.stateExecutor = Executors.newSingleThreadExecutor(new
ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Group State{" +
Group.this.local + "}");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ this.messageExecutor = Executors.newSingleThreadExecutor(new
ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Group Messages{" +
Group.this.local + "}");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
sendHeartBeat();
this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
public void run() {
@@ -326,19 +297,13 @@
}
});
this.timer = new Timer("Distributed heart beat", true);
- this.timer.scheduleAtFixedRate(this.heartBeatTask,
- getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
- this.timer.scheduleAtFixedRate(this.checkMembershipTask,
- getHeartBeatInterval(), getHeartBeatInterval());
- this.timer.scheduleAtFixedRate(this.expirationTask,
- EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
+ this.timer.scheduleAtFixedRate(this.heartBeatTask,
getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
+ this.timer.scheduleAtFixedRate(this.checkMembershipTask,
getHeartBeatInterval(), getHeartBeatInterval());
+ this.timer.scheduleAtFixedRate(this.expirationTask,
EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
// await for members to join
- long timeout = (long) (this.heartBeatInterval
- * this.minimumGroupSize *1.5);
+ long timeout = (long) (this.heartBeatInterval *
this.minimumGroupSize * 1.5);
long deadline = System.currentTimeMillis() + timeout;
- while ((this.members.size() < this.minimumGroupSize ||
!this.electionFinished
- .get())
- && timeout > 0) {
+ while ((this.members.size() < this.minimumGroupSize ||
!this.electionFinished.get()) && timeout > 0) {
synchronized (this.electionFinished) {
this.electionFinished.wait(timeout);
}
@@ -376,7 +341,6 @@
} catch (Exception e) {
LOG.debug("Caught exception stopping", e);
}
-
}
}
@@ -421,8 +385,7 @@
/**
* @param alwaysLock -
- * set true if objects inserted will always be locked (default
is
- * false)
+ * set true if objects inserted will always be locked (default
is false)
*/
public void setAlwaysLock(boolean alwaysLock) {
this.alwaysLock = alwaysLock;
@@ -520,8 +483,7 @@
}
/**
- * Sets the policy for owned objects in the group If set to true, when this
- * <code>GroupMap<code> stops,
+ * Sets the policy for owned objects in the group If set to true, when
this <code>GroupMap<code> stops,
* any objects it owns will be removed from the group map
* @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
*/
@@ -612,16 +574,14 @@
public boolean containsKey(Object key) {
synchronized (this.mapMutex) {
- return this.localMap != null ? this.localMap.containsKey(key)
- : false;
+ return this.localMap != null ? this.localMap.containsKey(key) :
false;
}
}
public boolean containsValue(Object value) {
EntryValue entryValue = new EntryValue(null, value);
synchronized (this.mapMutex) {
- return this.localMap != null ? this.localMap
- .containsValue(entryValue) : false;
+ return this.localMap != null ?
this.localMap.containsValue(entryValue) : false;
}
}
@@ -669,10 +629,9 @@
* @throws IllegalStateException
*
*/
- public V put(K key, V value) throws GroupUpdateException,
- IllegalStateException {
- return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
- isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+ public V put(K key, V value) throws GroupUpdateException,
IllegalStateException {
+ return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
isReleaseLockOnExit(), getTimeToLive(),
+ getLockTimeToLive());
}
/**
@@ -690,9 +649,8 @@
* @throws IllegalStateException
*
*/
- public V put(K key, V value, boolean lock, boolean removeOnExit,
- boolean releaseLockOnExit, long timeToLive, long leaseTime)
- throws GroupUpdateException, IllegalStateException {
+ public V put(K key, V value, boolean lock, boolean removeOnExit, boolean
releaseLockOnExit, long timeToLive,
+ long leaseTime) throws GroupUpdateException, IllegalStateException
{
checkStatus();
EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
entryKey.setLocked(lock);
@@ -756,10 +714,9 @@
* @throws GroupUpdateException
* @throws IllegalStateException
*/
- public void putAll(Map<? extends K, ? extends V> t)
- throws GroupUpdateException, IllegalStateException {
- putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
- isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+ public void putAll(Map<? extends K, ? extends V> t) throws
GroupUpdateException, IllegalStateException {
+ putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
isReleaseLockOnExit(), getTimeToLive(),
+ getLockTimeToLive());
}
/**
@@ -774,13 +731,10 @@
* @throws GroupUpdateException
* @throws IllegalStateException
*/
- public void putAll(Map<? extends K, ? extends V> t, boolean lock,
- boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
- long lockTimeToLive) throws GroupUpdateException,
- IllegalStateException {
+ public void putAll(Map<? extends K, ? extends V> t, boolean lock, boolean
removeOnExit, boolean releaseLockOnExit,
+ long timeToLive, long lockTimeToLive) throws GroupUpdateException,
IllegalStateException {
for (java.util.Map.Entry<? extends K, ? extends V> entry :
t.entrySet()) {
- put(entry.getKey(), entry.getValue(), lock, removeOnExit,
- releaseLockOnExit, timeToLive, lockTimeToLive);
+ put(entry.getKey(), entry.getValue(), lock, removeOnExit,
releaseLockOnExit, timeToLive, lockTimeToLive);
}
}
@@ -793,14 +747,12 @@
* @throws IllegalStateException
*
*/
- public V remove(Object key) throws GroupUpdateException,
- IllegalStateException {
+ public V remove(Object key) throws GroupUpdateException,
IllegalStateException {
EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
return doRemove(entryKey);
}
- V doRemove(EntryKey<K> key) throws GroupUpdateException,
- IllegalStateException {
+ V doRemove(EntryKey<K> key) throws GroupUpdateException,
IllegalStateException {
checkStatus();
EntryMessage entryMsg = new EntryMessage();
entryMsg.setKey(key);
@@ -863,8 +815,7 @@
}
/**
- * @return the local member that represents this <CODE>Group</CODE>
- * instance
+ * @return the local member that represents this <CODE>Group</CODE>
instance
*/
public Member getLocalMember() {
return this.local;
@@ -881,8 +832,7 @@
}
boolean result = false;
if (entryValue != null) {
- result = entryValue.getKey().getOwner().getId().equals(
- this.local.getId());
+ result =
entryValue.getKey().getOwner().getId().equals(this.local.getId());
}
return result;
}
@@ -934,8 +884,7 @@
*/
public void broadcastMessage(Object message) throws JMSException {
checkStatus();
- ObjectMessage objMsg = this.messageSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -952,8 +901,7 @@
* @return
* @throws JMSException
*/
- public Serializable broadcastMessageRequest(Object message, long timeout)
- throws JMSException {
+ public Serializable broadcastMessageRequest(Object message, long timeout)
throws JMSException {
checkStatus();
Object result = null;
MapRequest request = new MapRequest();
@@ -961,8 +909,7 @@
synchronized (this.messageRequests) {
this.messageRequests.put(id, request);
}
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage((Serializable) message);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(MESSAGE_TYPE);
@@ -973,16 +920,14 @@
}
/**
- * Send a message to the group - but only the least loaded member will
- * process it
+ * Send a message to the group - but only the least loaded member will
process it
*
* @param message
* @throws JMSException
*/
public void sendMessage(Object message) throws JMSException {
checkStatus();
- ObjectMessage objMsg = this.messageSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -998,8 +943,7 @@
*/
public void sendMessage(Member member, Object message) throws JMSException
{
checkStatus();
- ObjectMessage objMsg = this.messageSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(this.idGenerator.generateId());
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -1016,8 +960,7 @@
* @return the request or null
* @throws JMSException
*/
- public Object sendMessageRequest(Member member, Object message, long
timeout)
- throws JMSException {
+ public Object sendMessageRequest(Member member, Object message, long
timeout) throws JMSException {
checkStatus();
Object result = null;
MapRequest request = new MapRequest();
@@ -1025,8 +968,7 @@
synchronized (this.messageRequests) {
this.messageRequests.put(id, request);
}
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage((Serializable) message);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(MESSAGE_TYPE);
@@ -1044,11 +986,9 @@
* @param message
* @throws JMSException
*/
- public void sendMessageResponse(Member member, String replyId,
- Object message) throws JMSException {
+ public void sendMessageResponse(Member member, String replyId, Object
message) throws JMSException {
checkStatus();
- ObjectMessage objMsg = this.messageSession
- .createObjectMessage((Serializable) message);
+ ObjectMessage objMsg =
this.messageSession.createObjectMessage((Serializable) message);
objMsg.setJMSCorrelationID(replyId);
objMsg.setJMSType(MESSAGE_TYPE);
objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -1056,24 +996,21 @@
}
/**
- * Select a coordinator - coordinator weighting is used - or if everything
- * is equal - a comparison of member ids.
+ * Select a coordinator - coordinator weighting is used - or if everything
is equal - a comparison of member ids.
*
* @param members
* @return
*/
protected Member selectCordinator(List<Member> list) {
List<Member> sorted = sortMemberList(list);
- Member result = sorted.isEmpty() ? this.local : sorted
- .get(list.size() - 1);
+ Member result = sorted.isEmpty() ? this.local : sorted.get(list.size()
- 1);
return result;
}
protected List<Member> sortMemberList(List<Member> list) {
Collections.sort(list, new Comparator<Member>() {
public int compare(Member m1, Member m2) {
- int result = m1.getCoordinatorWeight()
- - m2.getCoordinatorWeight();
+ int result = m1.getCoordinatorWeight() -
m2.getCoordinatorWeight();
if (result == 0) {
result = m1.getId().compareTo(m2.getId());
}
@@ -1091,8 +1028,7 @@
this.stateRequests.put(id, request);
}
try {
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage(payload);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage(payload);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(STATE_TYPE);
@@ -1113,8 +1049,7 @@
return result;
}
- void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
- Serializable payload) {
+ void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
Serializable payload) {
MapRequest request = new MapRequest();
String id = this.idGenerator.generateId();
asyncRequest.add(id, request);
@@ -1122,8 +1057,7 @@
this.stateRequests.put(id, request);
}
try {
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage(payload);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage(payload);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
objMsg.setJMSType(STATE_TYPE);
@@ -1142,8 +1076,7 @@
processRequest(id, reply);
} else {
try {
- ObjectMessage replyMsg = this.stateSession
- .createObjectMessage((Serializable) reply);
+ ObjectMessage replyMsg =
this.stateSession.createObjectMessage((Serializable) reply);
replyMsg.setJMSCorrelationID(id);
replyMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(replyTo, replyMsg);
@@ -1162,8 +1095,7 @@
try {
EntryMessage copy = entry.copy();
copy.setMapUpdate(true);
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage(copy);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage(copy);
objMsg.setJMSCorrelationID(correlationId);
objMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(this.stateTopic, objMsg);
@@ -1230,22 +1162,18 @@
}
}
- void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
- String correlationId) {
+ void processLockUpdate(EntryMessage entryMsg, Destination replyTo, String
correlationId) {
waitForElection();
synchronized (this.mapMutex) {
boolean newLock = entryMsg.getKey().isLocked();
Member newOwner = entryMsg.getKey().getOwner();
- long newLockExpiration = newLock ? entryMsg.getKey()
- .getLockExpiration() : 0l;
+ long newLockExpiration = newLock ?
entryMsg.getKey().getLockExpiration() : 0l;
if (isCoordinator() && !entryMsg.isMapUpdate()) {
EntryKey originalKey = getKey(entryMsg.getKey().getKey());
if (originalKey != null) {
if (originalKey.isLocked()) {
- if (!originalKey.getOwner().equals(
- entryMsg.getKey().getOwner())) {
- Serializable reply = new GroupUpdateException(
- "Owned by " + originalKey.getOwner());
+ if
(!originalKey.getOwner().equals(entryMsg.getKey().getOwner())) {
+ Serializable reply = new
GroupUpdateException("Owned by " + originalKey.getOwner());
sendReply(reply, replyTo, correlationId);
} else {
originalKey.setLocked(newLock);
@@ -1271,13 +1199,11 @@
}
}
- void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
- String correlationId) {
+ void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
String correlationId) {
waitForElection();
if (isCoordinator()) {
EntryKey<K> key = entryMsg.getKey();
- EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
- .getValue());
+ EntryValue<V> value = new EntryValue<V>(key, (V)
entryMsg.getValue());
boolean insert = entryMsg.isInsert();
boolean containsKey = false;
synchronized (this.mapMutex) {
@@ -1285,8 +1211,7 @@
}
if (containsKey) {
EntryKey originalKey = getKey(key.getKey());
- if (originalKey.equals(key.getOwner())
- || !originalKey.isLocked()) {
+ if (originalKey.equals(key.getOwner()) ||
!originalKey.isLocked()) {
EntryValue<V> old = null;
if (insert) {
synchronized (this.mapMutex) {
@@ -1299,11 +1224,9 @@
}
entryMsg.setOldValue(old.getValue());
broadcastMapUpdate(entryMsg, correlationId);
- fireMapChanged(key.getOwner(), key.getKey(),
- old.getValue(), value.getValue(), false);
+ fireMapChanged(key.getOwner(), key.getKey(),
old.getValue(), value.getValue(), false);
} else {
- Serializable reply = new GroupUpdateException(
- "Owned by " + originalKey.getOwner());
+ Serializable reply = new GroupUpdateException("Owned by "
+ originalKey.getOwner());
sendReply(reply, replyTo, correlationId);
}
} else {
@@ -1312,8 +1235,7 @@
this.localMap.put(key.getKey(), value);
}
broadcastMapUpdate(entryMsg, correlationId);
- fireMapChanged(key.getOwner(), key.getKey(), null, value
- .getValue(), false);
+ fireMapChanged(key.getOwner(), key.getKey(), null,
value.getValue(), false);
} else {
sendReply(null, replyTo, correlationId);
}
@@ -1349,24 +1271,20 @@
value.setValue(null);
}
}
- fireMapChanged(key.getOwner(), key.getKey(),
- old.getValue(), value.getValue(), entryMsg
- .isExpired());
+ fireMapChanged(key.getOwner(), key.getKey(),
old.getValue(), value.getValue(), entryMsg.isExpired());
}
} else {
if (insert) {
synchronized (this.mapMutex) {
this.localMap.put(key.getKey(), value);
}
- fireMapChanged(key.getOwner(), key.getKey(), null, value
- .getValue(), false);
+ fireMapChanged(key.getOwner(), key.getKey(), null,
value.getValue(), false);
}
}
}
}
- void processGroupMessage(String memberId, String replyId,
- Destination replyTo, Object payload) {
+ void processGroupMessage(String memberId, String replyId, Destination
replyTo, Object payload) {
Member member = this.members.get(memberId);
if (member != null) {
fireMemberMessage(member, replyId, payload);
@@ -1412,8 +1330,7 @@
void handleConsumerEvents(ConsumerEvent event) {
if (!event.isStarted()) {
- Member member = this.members.remove(event.getConsumerId()
- .toString());
+ Member member =
this.members.remove(event.getConsumerId().toString());
if (member != null) {
fireMemberStopped(member);
election(member, false);
@@ -1423,8 +1340,7 @@
void checkMembership() {
if (this.started.get() && this.electionFinished.get()) {
- long checkTime = System.currentTimeMillis()
- - getHeartBeatInterval();
+ long checkTime = System.currentTimeMillis() -
getHeartBeatInterval();
boolean doElection = false;
for (Member member : this.members.values()) {
if (member.getTimeStamp() < checkTime) {
@@ -1442,8 +1358,7 @@
void expirationSweep() {
waitForElection();
- if (isCoordinator() && this.started.get()
- && this.electionFinished.get()) {
+ if (isCoordinator() && this.started.get() &&
this.electionFinished.get()) {
List<EntryKey> expiredMessages = null;
List<EntryKey> expiredLocks = null;
synchronized (this.mapMutex) {
@@ -1455,13 +1370,12 @@
if (k.isExpired(currentTime)) {
if (expiredMessages == null) {
expiredMessages = new ArrayList<EntryKey>();
- expiredMessages.add(k);
}
+ expiredMessages.add(k);
} else if (k.isLockExpired(currentTime)) {
k.setLocked(false);
if (expiredLocks == null) {
expiredLocks = new ArrayList<EntryKey>();
- expiredLocks.add(k);
}
expiredLocks.add(k);
}
@@ -1489,8 +1403,7 @@
}
void doMessageExpiration(List<EntryKey> list) {
- if (this.started.get() && this.electionFinished.get()
- && isCoordinator()) {
+ if (this.started.get() && this.electionFinished.get() &&
isCoordinator()) {
for (EntryKey k : list) {
EntryValue<V> old = null;
synchronized (this.mapMutex) {
@@ -1503,16 +1416,14 @@
entryMsg.setKey(k);
entryMsg.setValue(old.getValue());
broadcastMapUpdate(entryMsg, "");
- fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
- null, true);
+ fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
null, true);
}
}
}
}
void doLockExpiration(List<EntryKey> list) {
- if (this.started.get() && this.electionFinished.get()
- && isCoordinator()) {
+ if (this.started.get() && this.electionFinished.get() &&
isCoordinator()) {
for (EntryKey k : list) {
EntryMessage entryMsg = new EntryMessage();
entryMsg.setType(EntryMessage.MessageType.DELETE);
@@ -1530,8 +1441,7 @@
void sendHeartBeat(Destination destination) {
if (this.started.get()) {
try {
- ObjectMessage msg = this.stateSession
- .createObjectMessage(this.local);
+ ObjectMessage msg =
this.stateSession.createObjectMessage(this.local);
msg.setJMSType(STATE_TYPE);
this.stateProducer.send(destination, msg);
} catch (javax.jms.IllegalStateException e) {
@@ -1548,8 +1458,7 @@
List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K,
EntryValue<V>>>();
synchronized (this.mapMutex) {
if (this.localMap != null) {
- for (Map.Entry<K, EntryValue<V>> entry : this.localMap
- .entrySet()) {
+ for (Map.Entry<K, EntryValue<V>> entry :
this.localMap.entrySet()) {
list.add(entry);
}
}
@@ -1561,12 +1470,10 @@
entryMsg.setValue(entry.getValue().getValue());
entryMsg.setType(EntryMessage.MessageType.SYNC);
entryMsg.setMapUpdate(true);
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage(entryMsg);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage(entryMsg);
if (!member.equals(entry.getValue().getKey().getOwner())) {
objMsg.setJMSType(STATE_TYPE);
- this.stateProducer.send(member.getInBoxDestination(),
- objMsg);
+ this.stateProducer.send(member.getInBoxDestination(),
objMsg);
}
}
} catch (javax.jms.IllegalStateException e) {
@@ -1615,16 +1522,13 @@
synchronized (this.mapMutex) {
value = this.localMap.remove(entryKey);
}
- fireMapChanged(member, entryKey.getKey(), value.getValue(),
- null, false);
+ fireMapChanged(member, entryKey.getKey(), value.getValue(),
null, false);
}
}
}
- void fireMemberMessage(final Member member, final String replyId,
- final Object message) {
- if (this.started.get() && this.stateExecutor != null
- && !this.messageExecutor.isShutdown()) {
+ void fireMemberMessage(final Member member, final String replyId, final
Object message) {
+ if (this.started.get() && this.stateExecutor != null &&
!this.messageExecutor.isShutdown()) {
this.messageExecutor.execute(new Runnable() {
public void run() {
doFireMemberMessage(member, replyId, message);
@@ -1641,10 +1545,9 @@
}
}
- void fireMapChanged(final Member owner, final Object key,
- final Object oldValue, final Object newValue, final boolean
expired) {
- if (this.started.get() && this.stateExecutor != null
- && !this.stateExecutor.isShutdown()) {
+ void fireMapChanged(final Member owner, final Object key, final Object
oldValue, final Object newValue,
+ final boolean expired) {
+ if (this.started.get() && this.stateExecutor != null &&
!this.stateExecutor.isShutdown()) {
this.stateExecutor.execute(new Runnable() {
public void run() {
doFireMapChanged(owner, key, oldValue, newValue, expired);
@@ -1653,8 +1556,7 @@
}
}
- void doFireMapChanged(Member owner, Object key, Object oldValue,
- Object newValue, boolean expired) {
+ void doFireMapChanged(Member owner, Object key, Object oldValue, Object
newValue, boolean expired) {
if (this.started.get()) {
for (GroupStateChangedListener l : this.mapChangedListeners) {
if (oldValue == null) {
@@ -1670,28 +1572,24 @@
void checkStatus() throws IllegalStateException {
if (!started.get()) {
- throw new IllegalStateException("GroupMap " + this.local.getName()
- + " not started");
+ throw new IllegalStateException("GroupMap " + this.local.getName()
+ " not started");
}
waitForElection();
}
public String toString() {
- return "Group:" + getName() + "{id=" + this.local.getId()
- + ",coordinator=" + isCoordinator() + ",inbox="
+ return "Group:" + getName() + "{id=" + this.local.getId() +
",coordinator=" + isCoordinator() + ",inbox="
+ this.local.getInBoxDestination() + "}";
}
void election(final Member member, final boolean memberStarted) {
- if (this.started.get() && this.stateExecutor != null
- && !this.electionExecutor.isShutdown()) {
+ if (this.started.get() && this.electionExecutor != null &&
!this.electionExecutor.isShutdown()) {
synchronized (this.electionFinished) {
this.electionFinished.set(false);
}
synchronized (this.electionExecutor) {
// remove any queued election tasks
- List<Runnable> list = new ArrayList<Runnable>(
- this.electionExecutor.getQueue());
+ List<Runnable> list = new
ArrayList<Runnable>(this.electionExecutor.getQueue());
for (Runnable r : list) {
ElectionService es = (ElectionService) r;
es.stop();
@@ -1723,8 +1621,7 @@
return result;
}
- void processElectionMessage(ElectionMessage msg, Destination replyTo,
- String correlationId) {
+ void processElectionMessage(ElectionMessage msg, Destination replyTo,
String correlationId) {
if (msg.isElection()) {
msg.setType(ElectionMessage.MessageType.ANSWER);
msg.setMember(this.local);
@@ -1745,16 +1642,14 @@
ElectionMessage msg = new ElectionMessage();
msg.setMember(this.local);
msg.setType(type);
- ObjectMessage objMsg = this.stateSession
- .createObjectMessage(msg);
+ ObjectMessage objMsg =
this.stateSession.createObjectMessage(msg);
objMsg.setJMSType(STATE_TYPE);
this.stateProducer.send(this.stateTopic, objMsg);
} catch (javax.jms.IllegalStateException e) {
// ignore - we are stopping
} catch (JMSException e) {
if (this.started.get()) {
- LOG.error("Failed to broadcast election message: " + type,
- e);
+ LOG.error("Failed to broadcast election message: " + type,
e);
}
}
}
@@ -1795,30 +1690,26 @@
}
void doElection() {
- if ((this.member == null || (!this.member.equals(Group.this.local)
|| Group.this.members
- .size() == getMinimumGroupSize()))) {
+ if ((this.member == null || (!this.member.equals(Group.this.local)
|| Group.this.members.size() == getMinimumGroupSize()))) {
boolean wasCoordinator = isCoordinatorMatch() && !isEmpty();
// call an election
while (!callElection() && isStarted() && this.started.get())
;
if (isStarted() && this.started.get()) {
- List<Member> members = new ArrayList<Member>(
- Group.this.members.values());
+ List<Member> members = new
ArrayList<Member>(Group.this.members.values());
Group.this.coordinator = selectCordinator(members);
if (isCoordinatorMatch()) {
broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
}
if (this.memberStarted && this.member != null) {
- if (wasCoordinator || isCoordinator()
- && this.started.get()) {
+ if (wasCoordinator || isCoordinator() &&
this.started.get()) {
updateNewMemberMap(this.member);
}
}
if (!isElectionFinished() && this.started.get()) {
try {
synchronized (Group.this.electionFinished) {
- Group.this.electionFinished
- .wait(Group.this.heartBeatInterval *
2);
+
Group.this.electionFinished.wait(Group.this.heartBeatInterval * 2);
}
} catch (InterruptedException e) {
}