Author: rajikak
Date: Mon Jul 29 19:01:07 2013
New Revision: 1508181
URL: http://svn.apache.org/r1508181
Log:
removed dead code.
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java?rev=1508181&r1=1508180&r2=1508181&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportConstant.java
Mon Jul 29 19:01:07 2013
@@ -105,13 +105,15 @@ public final class AMQPTransportConstant
/**
* True if requesting a mandatory publishing.
*/
- public static final String PARAMETER_PUBLISHER_MANDATORY_PUBLISH =
"transport.amqp.MandatoryPublish";
+ public static final String PARAMETER_PUBLISHER_MANDATORY_PUBLISH =
+ "transport.amqp.MandatoryPublish";
/**
* True if requesting an immediate publishing.
*/
- public static final String PARAMETER_PUBLISHER_IMMEDIATE_PUBLISH =
"transport.amqp.ImmediatePublish";
+ public static final String PARAMETER_PUBLISHER_IMMEDIATE_PUBLISH =
+ "transport.amqp.ImmediatePublish";
/**
@@ -150,18 +152,20 @@ public final class AMQPTransportConstant
/**
- * True if the polling task should wait until the she processed the
accepted message. This can be used
- * in conjunction with a single thread polling task(in the whole
transport, i.e. only a single AMQP proxy per flow)
- * to achieve in order delivery.
+ * True if the polling task should wait until the she processed the
accepted message.
+ * This can be used in conjunction with a single thread polling task(in
the whole transport,
+ * i.e. only a single AMQP proxy per flow) to achieve in order delivery.
*/
- public static final String PARAMETER_OPERATE_ON_BLOCKING_MODE =
"transport.amqp.OperateOnBlockingMode";
+ public static final String PARAMETER_OPERATE_ON_BLOCKING_MODE =
+ "transport.amqp.OperateOnBlockingMode";
/**
- * If a polling task encounter an exception due to some reason(most
probably due to broker outage) the number of
- * milliseconds it should be suspended before next re-try.
+ * If a polling task encounter an exception due to some reason(most
probably due to broker
+ * outage) the number of milliseconds it should be suspended before next
re-try.
*/
- public static final String PARAMETER_INITIAL_RE_CONNECTION_DURATION =
"transport.amqp.InitialReconnectDuration";
+ public static final String PARAMETER_INITIAL_RE_CONNECTION_DURATION =
+ "transport.amqp.InitialReconnectDuration";
/**
* If the polling task fails again after the initial re-connection duration
@@ -169,25 +173,31 @@ public final class AMQPTransportConstant
* next suspend duration will be calculated using this
* (PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR *
PARAMETER_INITIAL_RE_CONNECTION_DURATION).
*/
- public static final String PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR =
"transport.amqp.ReconnectionProgressionFactor";
+ public static final String PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR =
+ "transport.amqp.ReconnectionProgressionFactor";
/**
- * The maximum duration to suspend the polling task in case of an error.
The current suspend duration will reach this
+ * The maximum duration to suspend the polling task in case of an error.
The current suspend
+ * duration will reach this
* value by following the series,
- * PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR *
PARAMETER_INITIAL_RE_CONNECTION_DURATION. This upper bound is there
+ * PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR *
PARAMETER_INITIAL_RE_CONNECTION_DURATION.
+ * This upper bound is there
* because nobody wants to wait a long time until the next re-try if the
broker is alive.
*/
- public static final String PARAMETER_MAX_RE_CONNECTION_DURATION =
"transport.amqp.MaximumReconnectionDuration";
+ public static final String PARAMETER_MAX_RE_CONNECTION_DURATION =
+ "transport.amqp.MaximumReconnectionDuration";
/**
* The connection factory to be used either with consumer or producer.
*/
- public static final String PARAMETER_CONNECTION_FACTORY_NAME =
"transport.amqp.ConnectionFactoryName";
+ public static final String PARAMETER_CONNECTION_FACTORY_NAME =
+ "transport.amqp.ConnectionFactoryName";
/**
- * In a two-way scenario which connection factory of the senders' should
be used to send the response
+ * In a two-way scenario which connection factory of the senders' should
be used to send
+ * the response
*/
public static final String PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME =
"transport.amqp.ResponseConnectionFactoryName";
@@ -211,23 +221,27 @@ public final class AMQPTransportConstant
* {@link AMQPTransportConstant#PARAMETER_SCHEDULED_TASK_INITIAL_DELAY} and
* {@link AMQPTransportConstant#PARAMETER_SCHEDULED_TASK_DELAY}.
*/
- public static final String PARAMETER_SCHEDULED_TASK_TIME_UNIT =
"transport.amqp.ScheduledTaskTimeUnit";
+ public static final String PARAMETER_SCHEDULED_TASK_TIME_UNIT =
+ "transport.amqp.ScheduledTaskTimeUnit";
/**
* Number of concurrent consumers per polling task.
*/
- public static final String PARAMETER_NO_OF_CONCURRENT_CONSUMERS =
"transport.amqp.NoOfConcurrentConsumers";
+ public static final String PARAMETER_NO_OF_CONCURRENT_CONSUMERS =
+ "transport.amqp.NoOfConcurrentConsumers";
/**
* Number of dispatching task to use any request messages to actual
processing task.
*/
- public static final String PARAMETER_DISPATCHING_TASK_SIZE =
"transport.amqp.NoOfDispatchingTask";
+ public static final String PARAMETER_DISPATCHING_TASK_SIZE =
+ "transport.amqp.NoOfDispatchingTask";
/**
* Use the given channel number if possible. See
*
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.0.1/rabbitmq-java-client-javadoc-3.0.1/com/rabbitmq/client/Connection.html#createChannel(int)
*/
- public static final String PARAMETER_AMQP_CHANNEL_NUMBER =
"transport.amqp.ChannelNumber";
+ public static final String PARAMETER_AMQP_CHANNEL_NUMBER =
+ "transport.amqp.ChannelNumber";
/**
* Configure the content type as a service parameter
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java?rev=1508181&r1=1508180&r2=1508181&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTask.java
Mon Jul 29 19:01:07 2013
@@ -113,7 +113,9 @@ public class AMQPTransportPollingTask {
/**
- * The name of the queue this consumer should bind to.{@link
AMQPTransportConstant#PARAMETER_QUEUE_NAME}
+ * The name of the queue this consumer should bind to.{
+ *
+ * @link AMQPTransportConstant#PARAMETER_QUEUE_NAME}
*/
private String queueName = null;
@@ -460,14 +462,20 @@ public class AMQPTransportPollingTask {
} else {
// assume default exchange and bindings - simple consumer
- channel.queueDeclare(queueName, isQueueDurable,
isQueueRestricted, isQueueAutoDelete, null);
+ channel.queueDeclare(
+ queueName,
+ isQueueDurable,
+ isQueueRestricted,
+ isQueueAutoDelete,
+ null);
}
} catch (IOException e) {
handleException(e.getMessage(), e);
}
- // schedule dispatching tasks to handover messages from the internal
buffer to actual processing task
+ // schedule dispatching tasks to handover messages from the internal
buffer to actual
+ // processing task
for (int i = 0; i < noOfDispatchingTask; i++) {
pollingTaskScheduler.execute(new MessageDispatchTask(buffers));
}
@@ -490,8 +498,8 @@ public class AMQPTransportPollingTask {
}
if (log.isDebugEnabled()) {
- log.debug("A polling task started listening on the queue '" +
queueName + "' on behalf of the service '" +
- serviceName + "'");
+ log.debug("A polling task started listening on the queue '" +
queueName + "' on " +
+ "behalf of the service '" + serviceName + "'");
}
}
@@ -547,8 +555,8 @@ public class AMQPTransportPollingTask {
log.error("Polling task for service '" + serviceName + "'
received a " +
"shutdown signal", e);
} catch (ConsumerCancelledException e) {
- log.error("Polling task for service '" + serviceName + "'
received a cancellation " +
- "signal");
+ log.error("Polling task for service '" + serviceName + "'
received a " +
+ "cancellation signal");
}
}
}
@@ -601,8 +609,8 @@ public class AMQPTransportPollingTask {
handleFaultMessage(message, buffers, axisFault);
} catch (Exception e) {
// do not let the task die
- log.error("Error while sending the fault message to the
client. Client will not" +
- " receive any errors!", e);
+ log.error("Error while sending the fault message to the
client. Client will " +
+ "not receive any errors!", e);
}
}
}
@@ -629,15 +637,17 @@ public class AMQPTransportPollingTask {
Map<String, Object> trpHeaders = message.getHeaders();
if (message.getReplyTo() != null) {
- // this may not be the optimal way to check if this
message should send a reply
- // a one way message can be send with 'reply to' set
+ // this may not be the optimal way to check if this
message should send
+ // a reply a one way message can be send with 'reply
to' set
msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
new AMQPOutTransportInfo(contentType,
responseConnectionFactory,
message.getReplyTo()));
msgContext.setProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO,
message.getReplyTo());
- // cache the connection factory so that it can be used
for sending the response
-
msgContext.setProperty(AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME,
+ // cache the connection factory so that it can be used
for sending the
+ // response
+ msgContext.setProperty(
+
AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME,
responseConnectionFactory);
}
@@ -653,8 +663,10 @@ public class AMQPTransportPollingTask {
msgContext.setProperty(MessageContext.TRANSPORT_HEADERS,
trpHeaders);
Builder builder =
BuilderUtil.getBuilderFromSelector(contentType, msgContext);
- InputStream gzipInputStream =
HTTPTransportUtils.handleGZip(msgContext, inputStream);
- OMElement documentElement =
builder.processDocument(gzipInputStream, contentType, msgContext);
+ InputStream gzipInputStream =
HTTPTransportUtils.handleGZip(
+ msgContext, inputStream);
+ OMElement documentElement = builder.processDocument(
+ gzipInputStream, contentType, msgContext);
msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
isSOAP11 = msgContext.isSOAP11();
@@ -708,4 +720,4 @@ public class AMQPTransportPollingTask {
log.error(msg, t);
throw new AMQPTransportException(msg, t);
}
-}
+}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java?rev=1508181&r1=1508180&r2=1508181&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java
Mon Jul 29 19:01:07 2013
@@ -94,7 +94,8 @@ public class AMQPTransportPollingTaskFac
pt.setConnectionFactoryName(connectionFactory.getName());
String responseConFac = AMQPTransportUtils.getOptionalStringParameter(
-
AMQPTransportConstant.PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME, svcParam,
conFacParam);
+
AMQPTransportConstant.PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME,
+ svcParam, conFacParam);
if (responseConFac != null) {
pt.setResponseConnectionFactory(responseConFac);
}
@@ -157,7 +158,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer noOfConsumers = AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_NO_OF_CONCURRENT_CONSUMERS, svcParam,
conFacParam);
+ AMQPTransportConstant.PARAMETER_NO_OF_CONCURRENT_CONSUMERS,
+ svcParam, conFacParam);
if (noOfConsumers != null) {
pt.setNoOfConcurrentConsumers(noOfConsumers);
}
@@ -167,7 +169,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer initialReconectionDuration =
AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION, svcParam,
conFacParam);
+
AMQPTransportConstant.PARAMETER_INITIAL_RE_CONNECTION_DURATION,
+ svcParam, conFacParam);
if (initialReconectionDuration != null) {
pt.setInitialReconnectDuration(initialReconectionDuration);
}
@@ -177,7 +180,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer reconnectionFactor =
AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR, svcParam,
conFacParam);
+
AMQPTransportConstant.PARAMETER_RE_CONNECTION_PROGRESSION_FACTOR,
+ svcParam, conFacParam);
if (reconnectionFactor != null) {
pt.setReconnectionFactor(reconnectionFactor);
}
@@ -187,7 +191,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer dispatchingTask =
AMQPTransportUtils.getOptionalIntParameter(
- AMQPTransportConstant.PARAMETER_DISPATCHING_TASK_SIZE,
svcParam, conFacParam);
+ AMQPTransportConstant.PARAMETER_DISPATCHING_TASK_SIZE,
+ svcParam, conFacParam);
if (dispatchingTask != null) {
pt.setNoOfDispatchingTask(dispatchingTask);
}
@@ -203,7 +208,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer initialDelay = AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY, svcParam,
conFacParam);
+
AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY,
+ svcParam, conFacParam);
if (initialDelay != null) {
pt.setScheduledTaskInitialDelay(initialDelay.intValue());
}
@@ -213,7 +219,8 @@ public class AMQPTransportPollingTaskFac
try {
Integer delay = AMQPTransportUtils.getOptionalIntParameter(
-
AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY, svcParam,
conFacParam);
+
AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY,
+ svcParam, conFacParam);
if (delay != null) {
pt.setScheduledTaskDelay(delay.intValue());
}
@@ -222,7 +229,8 @@ public class AMQPTransportPollingTaskFac
}
String timeUnit = AMQPTransportUtils.getOptionalStringParameter(
- AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_TIME_UNIT,
svcParam, conFacParam);
+ AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_TIME_UNIT,
+ svcParam, conFacParam);
if (timeUnit != null) {
pt.setScheduledTaskTimeUnit(getTimeUnit(timeUnit));