Author: indika
Date: Tue Dec 23 09:11:38 2008
New Revision: 729040
URL: http://svn.apache.org/viewvc?rev=729040&view=rev
Log:
all local states replication methods are removed
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java?rev=729040&r1=729039&r2=729040&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java
Tue Dec 23 09:11:38 2008
@@ -18,13 +18,12 @@
*/
package org.apache.synapse.endpoints;
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.Replicator;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.util.Replicator;
import java.util.Date;
@@ -84,7 +83,7 @@
* @param cfgCtx the Axis2 configurationContext for clustering
*/
public EndpointContext(String endpointName, EndpointDefinition
endpointDefinition,
- boolean clustered, ConfigurationContext cfgCtx) {
+ boolean clustered, ConfigurationContext cfgCtx) {
if (clustered) {
if (endpointName == null) {
@@ -101,28 +100,29 @@
this.endpointName = endpointDefinition.toString();
}
- STATE_KEY = KEY_PREFIX + endpointName + STATE;
- NEXT_RETRY_TIME_KEY = KEY_PREFIX + endpointName +
NEXT_RETRY_TIME;
- REMAINING_RETRIES_KEY = KEY_PREFIX + endpointName +
REMAINING_RETRIES;
+ STATE_KEY = KEY_PREFIX + endpointName + STATE;
+ NEXT_RETRY_TIME_KEY = KEY_PREFIX + endpointName + NEXT_RETRY_TIME;
+ REMAINING_RETRIES_KEY = KEY_PREFIX + endpointName + REMAINING_RETRIES;
LAST_SUSPEND_DURATION_KEY = KEY_PREFIX + endpointName +
LAST_SUSPEND_DURATION;
}
/**
* Update the internal state of the endpoint
+ *
* @param state the new state of the endpoint
*/
private void setState(int state) {
if (isClustered) {
- setAndReplicateState(STATE_KEY, state);
+ Replicator.setAndReplicateState(STATE_KEY, state, cfgCtx);
switch (state) {
- case ST_ACTIVE : {
- setAndReplicateState(REMAINING_RETRIES_KEY,
- definition.getRetriesOnTimeoutBeforeSuspend());
- setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null);
+ case ST_ACTIVE: {
+ Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
+ definition.getRetriesOnTimeoutBeforeSuspend(),
cfgCtx);
+ Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY,
null, cfgCtx);
break;
}
- case ST_TIMEOUT : {
+ case ST_TIMEOUT: {
Integer retries = (Integer)
cfgCtx.getPropertyNonReplicable(REMAINING_RETRIES_KEY);
if (retries == null) {
retries =
definition.getRetriesOnTimeoutBeforeSuspend();
@@ -135,27 +135,27 @@
setState(ST_SUSPENDED);
} else {
- setAndReplicateState(REMAINING_RETRIES_KEY,
(retries-1));
+ Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
(retries - 1), cfgCtx);
long nextRetry = System.currentTimeMillis() +
definition.getRetryDurationOnTimeout();
- setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetry);
+ Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY,
nextRetry, cfgCtx);
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + " is
marked as TIMEOUT and " +
- "will be retried : " + (retries-1) + " more
time/s after : " +
- new Date(nextRetry) + " until its marked
SUSPENDED for failure");
+ "will be retried : " + (retries - 1) + "
more time/s after : " +
+ new Date(nextRetry) + " until its marked
SUSPENDED for failure");
}
}
break;
}
- case ST_SUSPENDED : {
+ case ST_SUSPENDED: {
computeNextRetryTimeForSuspended();
break;
}
case ST_OFF: {
// mark as in maintenence, and reset all other information
- setAndReplicateState(REMAINING_RETRIES_KEY,
- definition == null ? -1 :
definition.getRetriesOnTimeoutBeforeSuspend());
- setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null);
+ Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
+ definition == null ? -1 :
definition.getRetriesOnTimeoutBeforeSuspend(), cfgCtx);
+ Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY,
null, cfgCtx);
break;
}
}
@@ -169,7 +169,7 @@
localLastSuspendDuration = -1;
break;
}
- case ST_TIMEOUT : {
+ case ST_TIMEOUT: {
int retries = localRemainingRetries;
if (retries == -1) {
retries =
definition.getRetriesOnTimeoutBeforeSuspend();
@@ -182,26 +182,26 @@
setState(ST_SUSPENDED);
} else {
- localRemainingRetries = retries -1;
+ localRemainingRetries = retries - 1;
localNextRetryTime =
- System.currentTimeMillis() +
definition.getRetryDurationOnTimeout();
+ System.currentTimeMillis() +
definition.getRetryDurationOnTimeout();
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + " is
marked as TIMEOUT and " +
- "will be retried : " + localRemainingRetries +
" more time/s after : " +
- new Date(localNextRetryTime) + " until its
marked SUSPENDED for failure");
+ "will be retried : " +
localRemainingRetries + " more time/s after : " +
+ new Date(localNextRetryTime) + " until its
marked SUSPENDED for failure");
}
}
break;
}
- case ST_SUSPENDED : {
+ case ST_SUSPENDED: {
computeNextRetryTimeForSuspended();
break;
}
case ST_OFF: {
// mark as in maintenence, and reset all other information
localRemainingRetries = definition == null ?
- -1 : definition.getRetriesOnTimeoutBeforeSuspend();
+ -1 : definition.getRetriesOnTimeoutBeforeSuspend();
localLastSuspendDuration = -1;
break;
}
@@ -221,14 +221,14 @@
} else {
if (localState != ST_ACTIVE && localState != ST_OFF) {
log.info("Endpoint : " + endpointName + " currently " +
getStateAsString() +
- " will now be marked active since it processed its last
message");
+ " will now be marked active since it processed its
last message");
setState(ST_ACTIVE);
}
}
}
/**
- * Endpoint failed processing a message
+ * Endpoint failed processing a message
*/
public void onFault() {
log.warn("Endpoint : " + endpointName + " will be marked SUSPENDED as
it failed");
@@ -236,7 +236,7 @@
}
/**
- * Endpoint timeout processing a message
+ * Endpoint timeout processing a message
*/
public void onTimeout() {
if (log.isDebugEnabled()) {
@@ -264,8 +264,8 @@
}
long nextSuspendDuration = (notYetSuspended ?
- definition.getInitialSuspendDuration() :
- (long) (lastSuspendDuration *
definition.getSuspendProgressionFactor()));
+ definition.getInitialSuspendDuration() :
+ (long) (lastSuspendDuration *
definition.getSuspendProgressionFactor()));
if (nextSuspendDuration > definition.getSuspendMaximumDuration()) {
nextSuspendDuration = definition.getSuspendMaximumDuration();
@@ -276,8 +276,8 @@
long nextRetryTime = System.currentTimeMillis() + nextSuspendDuration;
if (isClustered) {
- setAndReplicateState(LAST_SUSPEND_DURATION_KEY,
nextSuspendDuration);
- setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetryTime);
+ Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY,
nextSuspendDuration, cfgCtx);
+ Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY,
nextRetryTime, cfgCtx);
} else {
localLastSuspendDuration = nextSuspendDuration;
localNextRetryTime = nextRetryTime;
@@ -285,10 +285,10 @@
if (log.isDebugEnabled()) {
log.debug("Suspending endpoint : " + endpointName +
- (notYetSuspended ? " -" :
- " - last suspend duration was : " + lastSuspendDuration +
"ms and") +
- " current suspend duration is : " + nextSuspendDuration + "ms
- " +
- "Next retry after : " + new Date(nextRetryTime));
+ (notYetSuspended ? " -" :
+ " - last suspend duration was : " +
lastSuspendDuration + "ms and") +
+ " current suspend duration is : " + nextSuspendDuration +
"ms - " +
+ "Next retry after : " + new Date(nextRetryTime));
}
}
@@ -302,11 +302,11 @@
if (log.isDebugEnabled()) {
log.debug("Checking if endpoint : " + endpointName + " currently
at state " +
- getStateAsString() + " can be used now?");
+ getStateAsString() + " can be used now?");
}
if (isClustered) {
-
+
// gets the value from configuration context (The shared state
across all instances)
Integer state = (Integer)
cfgCtx.getPropertyNonReplicable(STATE_KEY);
Integer remainingRetries = (Integer)
cfgCtx.getPropertyNonReplicable(REMAINING_RETRIES_KEY);
@@ -329,17 +329,17 @@
// if we are in the ST_TIMEOUT state, reduce a remaining
retry
if (state == ST_TIMEOUT) {
remainingRetries--;
- setAndReplicateState(REMAINING_RETRIES_KEY,
remainingRetries);
+ Replicator.setAndReplicateState(REMAINING_RETRIES_KEY,
remainingRetries, cfgCtx);
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + "
currently in timeout state" +
- " is ready to retry. Remaining retries before
suspension : " + remainingRetries);
+ " is ready to retry. Remaining retries
before suspension : " + remainingRetries);
}
-
+
} else {
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + "
currently SUSPENDED," +
- " is ready to retry now");
+ " is ready to retry now");
}
}
return true;
@@ -364,13 +364,13 @@
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + " currently
in timeout state" +
- " is ready to retry. Remaining retries before
suspension : " + localRemainingRetries);
+ " is ready to retry. Remaining retries before
suspension : " + localRemainingRetries);
}
} else {
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + " currently
SUSPENDED," +
- " is ready to retry now");
+ " is ready to retry now");
}
}
return true;
@@ -379,7 +379,7 @@
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + " not ready and is
currently : " +
- getStateAsString() + " Next retry will be after : " + new
Date(localNextRetryTime));
+ getStateAsString() + " Next retry will be after : " + new
Date(localNextRetryTime));
}
return false;
@@ -414,6 +414,7 @@
/**
* Private method to return the current state as a loggable string
+ *
* @return the current state as a string
*/
private String getStateAsString() {
@@ -434,31 +435,6 @@
}
/**
- * Helper method to replicates states of the property with given key
- * replicates the given state so that all instances across cluster can
see this state
- *
- * @param key The key of the property
- * @param value The value of the property
- */
- private void setAndReplicateState(String key, Object value) {
-
- if (cfgCtx != null && key != null && value != null) {
-
- try {
- if (log.isDebugEnabled()) {
- log.debug("Replicating property key : " + key + " as : " +
value);
- }
- cfgCtx.setProperty(key, value);
- Replicator.replicate(cfgCtx, new String[]{key});
-
- } catch (ClusteringFault clusteringFault) {
- handleException("Error replicating property : " + key + " as :
" +
- value, clusteringFault);
- }
- }
- }
-
- /**
* Helper methods for handle errors.
*
* @param msg The error message
@@ -468,17 +444,6 @@
throw new SynapseException(msg);
}
- /**
- * Helper methods for handle errors.
- *
- * @param msg The error message
- * @param e The exception
- */
- private void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new SynapseException(msg, e);
- }
-
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("[ Name : ").append(endpointName).
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java?rev=729040&r1=729039&r2=729040&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
Tue Dec 23 09:11:38 2008
@@ -18,12 +18,11 @@
*/
package org.apache.synapse.endpoints.algorithms;
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.Replicator;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.util.Replicator;
import java.util.HashMap;
import java.util.Map;
@@ -42,11 +41,14 @@
private ConfigurationContext cfgCtx;
/* Are we supporting clustering ? */
- private Boolean isClusteringEnabled = null;
+ private boolean isClusteringEnabled = false;
/* The key for 'currentEPR' attribute when replicated in a clsuter */
private String CURRENT_EPR_PROP_KEY;
+ /* Prefix for uniquely identify properties of a particular endpoint */
+ private String PROPERTY_KEY_PREFIX;
+
/* The pointer to current epr - The position of the current EPR */
private int currentEPR = 0;
@@ -54,14 +56,16 @@
private Map<String, Object> localProperties;
public AlgorithmContext(boolean clusteringEnabled, ConfigurationContext
cfgCtx, String endpointName) {
+
this.cfgCtx = cfgCtx;
- if (clusteringEnabled) {
- isClusteringEnabled = Boolean.TRUE;
- } else {
- isClusteringEnabled = Boolean.FALSE;
+ this.isClusteringEnabled = clusteringEnabled;
+
+ if (!clusteringEnabled) {
localProperties = new HashMap<String, Object>();
+ } else {
+ PROPERTY_KEY_PREFIX = KEY_PREFIX + endpointName;
+ CURRENT_EPR_PROP_KEY = PROPERTY_KEY_PREFIX + CURRENT_EPR;
}
- CURRENT_EPR_PROP_KEY = KEY_PREFIX + endpointName + CURRENT_EPR;
}
/**
@@ -71,7 +75,7 @@
*/
public int getCurrentEndpointIndex() {
- if (Boolean.TRUE.equals(isClusteringEnabled)) {
+ if (isClusteringEnabled) {
Object value =
cfgCtx.getPropertyNonReplicable(this.CURRENT_EPR_PROP_KEY);
if (value == null) {
@@ -92,13 +96,12 @@
*/
public void setCurrentEndpointIndex(int currentEPR) {
- if (Boolean.TRUE.equals(isClusteringEnabled)) {
+ if (isClusteringEnabled) {
if (log.isDebugEnabled()) {
log.debug("Set EPR with key : " + CURRENT_EPR_PROP_KEY + " as
: " + currentEPR);
}
- setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR);
-
+ Replicator.setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR,
cfgCtx);
} else {
if (log.isDebugEnabled()) {
log.debug("Setting the current EPR as : " + currentEPR);
@@ -117,53 +120,6 @@
}
/**
- * Helper methods for handle errors.
- *
- * @param msg The error message
- */
- protected void handleException(String msg) {
- log.error(msg);
- throw new SynapseException(msg);
- }
-
- /**
- * Helper methods for handle errors.
- *
- * @param msg The error message
- * @param e The exception
- */
- protected void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new SynapseException(msg, e);
- }
-
- /**
- * Helper method to replicates states of the property with given key
- * Sets property and replicates the current state so that all instances
- * across cluster can see this state
- *
- * @param key The key of the property
- * @param value The value of the property
- */
- private void setAndReplicateState(String key, Object value) {
-
- if (cfgCtx != null && key != null && value != null) {
-
- try {
- if (log.isDebugEnabled()) {
- log.debug("Replicating property key : " + key + " as : " +
value);
- }
- cfgCtx.setProperty(key, value);
- Replicator.replicate(cfgCtx, new String[]{key});
-
- } catch (ClusteringFault clusteringFault) {
- handleException("Error replicating property : " + key + " as :
" +
- value, clusteringFault);
- }
- }
- }
-
- /**
* Get the property value corresponding to a specified key
*
* @param key The key of the property
@@ -171,7 +127,7 @@
*/
public Object getProperty(String key) {
if (Boolean.TRUE.equals(isClusteringEnabled)) {
- return cfgCtx.getPropertyNonReplicable(key);
+ return cfgCtx.getPropertyNonReplicable(PROPERTY_KEY_PREFIX + key);
} else {
return localProperties.get(key);
}
@@ -183,14 +139,14 @@
* In non-clustered environments properties will be stored in a local
property
* map.
*
- * @param key The key of the property
+ * @param key The key of the property
* @param value The value of the property
*/
public void setProperty(String key, Object value) {
if (key != null && value != null) {
if (Boolean.TRUE.equals(isClusteringEnabled)) {
- setAndReplicateState(key, value);
+ Replicator.setAndReplicateState(PROPERTY_KEY_PREFIX + key,
value, cfgCtx);
} else {
localProperties.put(key, value);
}