Author: kwall
Date: Tue Feb  7 11:30:30 2012
New Revision: 1241428

URL: http://svn.apache.org/viewvc?rev=1241428&view=rev
Log:
NO-JIRA: Move members back to top of class where they belong

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1241428&r1=1241427&r2=1241428&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Feb  7 11:30:30 2012
@@ -97,6 +97,186 @@ import java.util.concurrent.locks.Reentr
  */
 public abstract class AMQSession<C extends BasicMessageConsumer, P extends 
BasicMessageProducer> extends Closeable implements Session, QueueSession, 
TopicSession
 {
+    /** Used for debugging. */
+    private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession.class);
+
+    /** System property to enable strict AMQP compliance. */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
+
+    /** Strict AMQP default setting. */
+    public static final String STRICT_AMQP_DEFAULT = "false";
+
+    /** System property to enable failure if strict AMQP compliance is 
violated. */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+
+    /** Strickt AMQP failure default. */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+    /** System property to enable immediate message prefetching. */
+    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+
+    /** Immediate message prefetch default. */
+    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
+    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+
+    /**
+     * The default value for immediate flag used by producers created by this 
session is false. That is, a consumer does
+     * not need to be attached to a queue.
+     */
+    private final boolean _defaultImmediateValue = 
Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+    /**
+     * The default value for mandatory flag used by producers created by this 
session is true. That is, server will not
+     * silently drop messages where no queue is connected to the exchange for 
the message.
+     */
+    private final boolean _defaultMandatoryValue = 
Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+    /**
+     * The period to wait while flow controlled before sending a log message 
confirming that the session is still
+     * waiting on flow control being revoked
+     */
+    private final long _flowControlWaitPeriod = 
Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+    /**
+     * The period to wait while flow controlled before declaring a failure
+     */
+    private final long _flowControlWaitFailure = 
Long.getLong("qpid.flow_control_wait_failure",
+                                                                  
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+    private final boolean _delareQueues =
+        Boolean.parseBoolean(System.getProperty("qpid.declare_queues", 
"true"));
+
+    private final boolean _declareExchanges =
+        Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", 
"true"));
+
+    private final boolean _useAMQPEncodedMapMessage;
+
+    /**
+     * Flag indicating to start dispatcher as a daemon thread
+     */
+    protected final boolean DEAMON_DISPATCHER_THREAD = 
Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+
+    /** The connection to which this session belongs. */
+    private AMQConnection _connection;
+
+    /** Used to indicate whether or not this is a transactional session. */
+    private final boolean _transacted;
+
+    /** Holds the sessions acknowledgement mode. */
+    private final int _acknowledgeMode;
+
+    /** Holds this session unique identifier, used to distinguish it from 
other sessions. */
+    private int _channelId;
+
+    private int _ticket;
+
+    /** Holds the high mark for prefetched message, at which the session is 
suspended. */
+    private int _prefetchHighMark;
+
+    /** Holds the low mark for prefetched messages, below which the session is 
resumed. */
+    private int _prefetchLowMark;
+
+    /** Holds the message listener, if any, which is attached to this session. 
*/
+    private MessageListener _messageListener = null;
+
+    /** Used to indicate that this session has been started at least once. */
+    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> 
_subscriptions =
+            new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
+
+    private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new 
ConcurrentHashMap<C, String>();
+
+    private final Lock _subscriberDetails = new ReentrantLock(true);
+    private final Lock _subscriberAccess = new ReentrantLock(true);
+
+    private final FlowControllingBlockingQueue _queue;
+
+    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    private final AtomicLong _rollbackMark = new AtomicLong(-1);
+
+    private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new 
ConcurrentLinkedQueue<Long>();
+
+    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new 
ConcurrentLinkedQueue<Long>();
+
+    private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new 
ConcurrentLinkedQueue<Long>();
+
+    private Dispatcher _dispatcher;
+
+    private Thread _dispatcherThread;
+
+    private MessageFactoryRegistry _messageFactoryRegistry;
+
+    /** Holds all of the producers created by this session, keyed by their 
unique identifiers. */
+    private Map<Long, MessageProducer> _producers = new 
ConcurrentHashMap<Long, MessageProducer>();
+
+    /**
+     * Used as a source of unique identifiers so that the consumers can be 
tagged to match them to BasicConsume
+     * methods.
+     */
+    private int _nextTag = 1;
+
+    private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+
+    /**
+     * Contains a list of consumers which have been removed but which might 
still have
+     * messages to acknowledge, eg in client ack or transacted modes
+     */
+    private CopyOnWriteArrayList<C> _removedConsumers = new 
CopyOnWriteArrayList<C>();
+
+    /** Provides a count of consumers on destinations, in order to be able to 
know if a destination has consumers. */
+    private ConcurrentHashMap<Destination, AtomicInteger> 
_destinationConsumerCount =
+            new ConcurrentHashMap<Destination, AtomicInteger>();
+
+    /**
+     * Used as a source of unique identifiers for producers within the session.
+     *
+     * <p/> Access to this id does not require to be synchronized since 
according to the JMS specification only one
+     * thread of control is allowed to create producers for any given session 
instance.
+     */
+    private long _nextProducerId;
+
+    /**
+     * Set when recover is called. This is to handle the case where recover() 
is called by application code during
+     * onMessage() processing to ensure that an auto ack is not sent.
+     */
+    private volatile boolean _sessionInRecovery;
+
+    private volatile boolean _usingDispatcherForCleanup;
+
+    /** Used to indicates that the connection to which this session belongs, 
has been stopped. */
+    private boolean _connectionStopped;
+
+    /** Used to indicate that this session has a message listener attached to 
it. */
+    private boolean _hasMessageListeners;
+
+    /** Used to indicate that this session has been suspended. */
+    private boolean _suspended;
+
+    /**
+     * Used to protect the suspension of this session, so that critical code 
can be executed during suspension,
+     * without the session being resumed by other threads.
+     */
+    private final Object _suspensionLock = new Object();
+
+    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+    private final boolean _immediatePrefetch;
+
+    private final boolean _strictAMQP;
+
+    private final boolean _strictAMQPFATAL;
+    private final Object _messageDeliveryLock = new Object();
+
+    /** Session state : used to detect if commit is a) required b) allowed , 
i.e. does the tx span failover. */
+    private boolean _dirty;
+    /** Has failover occured on this session with outstanding actions to 
commit? */
+    private boolean _failedOverDirty;
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
     /**
      * Used to reference durable subscribers so that requests for unsubscribe 
can be handled correctly.  Note this only
      * keeps a record of subscriptions which have been created in the current 
instance. It does not remember
@@ -334,182 +514,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /** Used for debugging. */
-    private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession.class);
-
-    /**
-     * The default value for immediate flag used by producers created by this 
session is false. That is, a consumer does
-     * not need to be attached to a queue.
-     */
-    private final boolean _defaultImmediateValue = 
Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
-
-    /**
-     * The default value for mandatory flag used by producers created by this 
session is true. That is, server will not
-     * silently drop messages where no queue is connected to the exchange for 
the message.
-     */
-    private final boolean _defaultMandatoryValue = 
Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
-
-    /**
-     * The period to wait while flow controlled before sending a log message 
confirming that the session is still
-     * waiting on flow control being revoked
-     */
-    private final long _flowControlWaitPeriod = 
Long.getLong("qpid.flow_control_wait_notify_period",5000L);
-
-    /**
-     * The period to wait while flow controlled before declaring a failure
-     */
-    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
-    private final long _flowControlWaitFailure = 
Long.getLong("qpid.flow_control_wait_failure",
-                                                                  
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
-    private final boolean _delareQueues =
-        Boolean.parseBoolean(System.getProperty("qpid.declare_queues", 
"true"));
-
-    private final boolean _declareExchanges =
-        Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", 
"true"));
-    
-    private final boolean _useAMQPEncodedMapMessage;
-
-    /** System property to enable strict AMQP compliance. */
-    public static final String STRICT_AMQP = "STRICT_AMQP";
-
-    /** Strict AMQP default setting. */
-    public static final String STRICT_AMQP_DEFAULT = "false";
-
-    /** System property to enable failure if strict AMQP compliance is 
violated. */
-    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
-
-    /** Strickt AMQP failure default. */
-    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
-    /** System property to enable immediate message prefetching. */
-    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
-
-    /** Immediate message prefetch default. */
-    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
-
-    /**
-     * Flag indicating to start dispatcher as a daemon thread
-     */
-    protected final boolean DEAMON_DISPATCHER_THREAD = 
Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
-
-    /** The connection to which this session belongs. */
-    private AMQConnection _connection;
-
-    /** Used to indicate whether or not this is a transactional session. */
-    private final boolean _transacted;
-
-    /** Holds the sessions acknowledgement mode. */
-    private final int _acknowledgeMode;
-
-    /** Holds this session unique identifier, used to distinguish it from 
other sessions. */
-    private int _channelId;
-
-    private int _ticket;
-
-    /** Holds the high mark for prefetched message, at which the session is 
suspended. */
-    private int _prefetchHighMark;
-
-    /** Holds the low mark for prefetched messages, below which the session is 
resumed. */
-    private int _prefetchLowMark;
-
-    /** Holds the message listener, if any, which is attached to this session. 
*/
-    private MessageListener _messageListener = null;
-
-    /** Used to indicate that this session has been started at least once. */
-    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
-
-    private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> 
_subscriptions =
-            new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
-
-    private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new 
ConcurrentHashMap<C, String>();
-
-    private final Lock _subscriberDetails = new ReentrantLock(true);
-    private final Lock _subscriberAccess = new ReentrantLock(true);
-
-    private final FlowControllingBlockingQueue _queue;
-
-    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
-    private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
-    private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new 
ConcurrentLinkedQueue<Long>();
-
-    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new 
ConcurrentLinkedQueue<Long>();
-
-    private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new 
ConcurrentLinkedQueue<Long>();
-
-    private Dispatcher _dispatcher;
-
-    private Thread _dispatcherThread;
-
-    private MessageFactoryRegistry _messageFactoryRegistry;
-
-    /** Holds all of the producers created by this session, keyed by their 
unique identifiers. */
-    private Map<Long, MessageProducer> _producers = new 
ConcurrentHashMap<Long, MessageProducer>();
-
-    /**
-     * Used as a source of unique identifiers so that the consumers can be 
tagged to match them to BasicConsume
-     * methods.
-     */
-    private int _nextTag = 1;
-
-    private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
-
-    /**
-     * Contains a list of consumers which have been removed but which might 
still have
-     * messages to acknowledge, eg in client ack or transacted modes
-     */
-    private CopyOnWriteArrayList<C> _removedConsumers = new 
CopyOnWriteArrayList<C>();
-
-    /** Provides a count of consumers on destinations, in order to be able to 
know if a destination has consumers. */
-    private ConcurrentHashMap<Destination, AtomicInteger> 
_destinationConsumerCount =
-            new ConcurrentHashMap<Destination, AtomicInteger>();
-
-    /**
-     * Used as a source of unique identifiers for producers within the session.
-     *
-     * <p/> Access to this id does not require to be synchronized since 
according to the JMS specification only one
-     * thread of control is allowed to create producers for any given session 
instance.
-     */
-    private long _nextProducerId;
-
-    /**
-     * Set when recover is called. This is to handle the case where recover() 
is called by application code during
-     * onMessage() processing to ensure that an auto ack is not sent.
-     */
-    private volatile boolean _sessionInRecovery;
-
-    private volatile boolean _usingDispatcherForCleanup;
-
-    /** Used to indicates that the connection to which this session belongs, 
has been stopped. */
-    private boolean _connectionStopped;
-
-    /** Used to indicate that this session has a message listener attached to 
it. */
-    private boolean _hasMessageListeners;
-
-    /** Used to indicate that this session has been suspended. */
-    private boolean _suspended;
-
-    /**
-     * Used to protect the suspension of this session, so that critical code 
can be executed during suspension,
-     * without the session being resumed by other threads.
-     */
-    private final Object _suspensionLock = new Object();
-
-    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
-
-    private final boolean _immediatePrefetch;
-
-    private final boolean _strictAMQP;
-
-    private final boolean _strictAMQPFATAL;
-    private final Object _messageDeliveryLock = new Object();
-
-    /** Session state : used to detect if commit is a) required b) allowed , 
i.e. does the tx span failover. */
-    private boolean _dirty;
-    /** Has failover occured on this session with outstanding actions to 
commit? */
-    private boolean _failedOverDirty;
-    
     private static final class FlowControlIndicator
     {
         private volatile boolean _flowControl = true;
@@ -526,9 +530,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /** Flow control */
-    private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
     /**
      * Creates a new session on a connection.
      *
@@ -3392,8 +3393,6 @@ public abstract class AMQSession<C exten
                 _dispatcherLogger.info(_dispatcherThread.getName() + " 
started");
             }
 
-            UnprocessedMessage message;
-
             // Allow disptacher to start stopped
             synchronized (_lock)
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to