Author: rgodfrey
Date: Mon Oct 13 10:52:27 2014
New Revision: 1631345
URL: http://svn.apache.org/r1631345
Log:
Merge from trunk
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/ (props changed)
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1630747-1631344
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
Mon Oct 13 10:52:27 2014
@@ -300,6 +300,12 @@ public class MessageConsumerImpl impleme
}
}
+ Message receiveRecoveredMessage()
+ {
+ return _replaymessages.isEmpty() ? null : _replaymessages.remove(0);
+
+ }
+
Message receive0(final long timeout)
{
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Mon Oct 13 10:52:27 2014
@@ -922,7 +922,15 @@ public class SessionImpl implements Sess
else
{
consumer = _messageConsumerList.remove(0);
- msg = consumer.receive0(0L);
+ msg = consumer.receiveRecoveredMessage();
+ if(msg == null)
+ {
+ msg = consumer.receive0(0L);
+ }
+ else
+ {
+ recoveredMessage = true;
+ }
}
MessageListener listener = consumer._messageListener;
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Mon Oct 13 10:52:27 2014
@@ -107,7 +107,7 @@ public abstract class AbstractBDBMessage
{
new Upgrader(getEnvironmentFacade().getEnvironment(),
getParent()).upgradeIfNecessary();
}
- catch(DatabaseException e)
+ catch(RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot
upgrade store", e);
}
@@ -138,7 +138,7 @@ public abstract class AbstractBDBMessage
MESSAGE_METADATA_SEQ_CONFIG);
newMessageId = mmdSeq.get(null, 1);
}
- catch (DatabaseException de)
+ catch (RuntimeException de)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot get
sequence value for new message", de);
}
@@ -216,7 +216,7 @@ public abstract class AbstractBDBMessage
}
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot visit
message instances", e);
}
@@ -259,7 +259,7 @@ public abstract class AbstractBDBMessage
entries.add(entry);
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot visit
message instances", e);
}
@@ -306,7 +306,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot
recover distributed transactions", e);
}
@@ -350,7 +350,7 @@ public abstract class AbstractBDBMessage
return mdd;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
reading message metadata for message with id "
+ messageId
@@ -424,7 +424,7 @@ public abstract class AbstractBDBMessage
tx.abort();
}
}
- catch(DatabaseException e2)
+ catch(RuntimeException e2)
{
getLogger().warn(
"Unable to abort transaction after
LockConflictException on removal of message with id "
@@ -465,7 +465,7 @@ public abstract class AbstractBDBMessage
}
while(!complete);
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
getLogger().error("Unexpected BDB exception", e);
@@ -550,7 +550,7 @@ public abstract class AbstractBDBMessage
}
return written;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
getting AMQMessage with id "
+ messageId
@@ -587,7 +587,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
getting AMQMessage with id "
+ messageId
@@ -618,7 +618,7 @@ public abstract class AbstractBDBMessage
}
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw environmentFacade.handleDatabaseException("Cannot visit
messages", e);
}
@@ -630,7 +630,7 @@ public abstract class AbstractBDBMessage
{
cursor.close();
}
- catch(DatabaseException e)
+ catch(RuntimeException e)
{
throw environmentFacade.handleDatabaseException("Cannot
close cursor", e);
}
@@ -659,7 +659,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw environmentFacade.handleDatabaseException("Cannot visit
messages", e);
}
@@ -697,7 +697,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
writing AMQMessage with id "
+ messageId
@@ -740,7 +740,7 @@ public abstract class AbstractBDBMessage
getLogger().debug("Storing message metadata for message id " +
messageId + " in transaction " + tx);
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
writing message metadata with id "
+ messageId
@@ -779,7 +779,7 @@ public abstract class AbstractBDBMessage
}
getDeliveryDb().put(tx, key, value);
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw getEnvironmentFacade().handleDatabaseException("Error
writing enqueued message with id "
@@ -838,7 +838,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
getLogger().error("Failed to dequeue message " + messageId + " in
transaction " + tx, e);
@@ -879,7 +879,7 @@ public abstract class AbstractBDBMessage
getXidDb().put(txn, key, value);
return postActions;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
getLogger().error("Failed to write xid: " + e.getMessage(), e);
throw getEnvironmentFacade().handleDatabaseException("Error
writing xid to database", e);
@@ -910,7 +910,7 @@ public abstract class AbstractBDBMessage
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
getLogger().error("Failed to remove xid in transaction " + txn, e);
@@ -963,7 +963,7 @@ public abstract class AbstractBDBMessage
{
tx.abort();
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Error
aborting transaction: " + e.getMessage(), e);
}
@@ -975,7 +975,7 @@ public abstract class AbstractBDBMessage
{
storedSizeChange(delta);
}
- catch(DatabaseException e)
+ catch(RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Stored size
change exception", e);
}
@@ -1415,7 +1415,7 @@ public abstract class AbstractBDBMessage
txn =
getEnvironmentFacade().getEnvironment().beginTransaction(
null, null);
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw
getEnvironmentFacade().handleDatabaseException("failed to begin transaction",
e);
}
@@ -1476,7 +1476,7 @@ public abstract class AbstractBDBMessage
{
_txn =
getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
}
- catch(DatabaseException e)
+ catch(RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Cannot
create store transaction", e);
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
Mon Oct 13 10:52:27 2014
@@ -140,7 +140,7 @@ public class BDBConfigurationStore imple
}
_initialRecords = new ConfiguredObjectRecord[0];
}
- catch(DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Cannot upgrade
store", e);
}
@@ -156,7 +156,7 @@ public class BDBConfigurationStore imple
doVisitAllConfiguredObjectRecords(handler);
handler.end();
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Cannot visit
configured object records", e);
}
@@ -243,7 +243,7 @@ public class BDBConfigurationStore imple
_environmentFacade.close();
_environmentFacade = null;
}
- catch(DatabaseException e)
+ catch (RuntimeException e)
{
throw new StoreException("Exception occurred on message store
close", e);
}
@@ -268,7 +268,7 @@ public class BDBConfigurationStore imple
txn.commit();
txn = null;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Error creating
configured object " + configuredObject
+ " in database: " + e.getMessage(), e);
@@ -305,7 +305,7 @@ public class BDBConfigurationStore imple
txn = null;
return removed.toArray(new UUID[removed.size()]);
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Error deleting
configured objects from database", e);
}
@@ -334,7 +334,7 @@ public class BDBConfigurationStore imple
txn.commit();
txn = null;
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Error updating
configuration details within the store: " + e,e);
}
@@ -408,7 +408,7 @@ public class BDBConfigurationStore imple
}
writeHierarchyRecords(txn, configuredObject);
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
throw _environmentFacade.handleDatabaseException("Error writing
configured object " + configuredObject
+ " to database: " + e.getMessage(), e);
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
Mon Oct 13 10:52:27 2014
@@ -38,7 +38,7 @@ public class BDBUtils
{
cursor.close();
}
- catch(DatabaseException e)
+ catch (RuntimeException e)
{
// We need the possible side effect of the facade restarting
the environment but don't care about the exception
throw environmentFacade.handleDatabaseException("Cannot close
cursor", e);
@@ -55,7 +55,7 @@ public class BDBUtils
tx.abort();
}
}
- catch (DatabaseException e)
+ catch (RuntimeException e)
{
// We need the possible side effect of the facade restarting the
environment but don't care about the exception
environmentFacade.handleDatabaseException("Cannot abort
transaction", e);
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
Mon Oct 13 10:52:27 2014
@@ -57,7 +57,7 @@ public interface EnvironmentFacade
StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
- DatabaseException handleDatabaseException(String contextMessage,
DatabaseException e);
+ RuntimeException handleDatabaseException(String contextMessage,
RuntimeException e);
String getStoreLocation();
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
Mon Oct 13 10:52:27 2014
@@ -37,6 +37,7 @@ import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -266,13 +267,17 @@ public class StandardEnvironmentFacade i
}
@Override
- public DatabaseException handleDatabaseException(String contextMessage,
DatabaseException e)
+ public RuntimeException handleDatabaseException(String contextMessage,
RuntimeException e)
{
if (_environment != null && !_environment.isValid())
{
closeEnvironmentSafely();
}
- return e;
+ if (e instanceof StoreException)
+ {
+ return e;
+ }
+ return new StoreException("Unexpected exception occurred on store
operation", e);
}
@Override
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
Mon Oct 13 10:52:27 2014
@@ -79,7 +79,7 @@ public class DatabasePinger
}
}
}
- catch (DatabaseException de)
+ catch (RuntimeException de)
{
facade.handleDatabaseException("DatabaseException from
DatabasePinger ", de);
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Oct 13 10:52:27 2014
@@ -82,6 +82,7 @@ import com.sleepycat.je.rep.vlsn.VLSNRan
import com.sleepycat.je.utilint.PropUtil;
import com.sleepycat.je.utilint.VLSN;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreException;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -371,26 +372,40 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public DatabaseException handleDatabaseException(String contextMessage,
final DatabaseException dbe)
+ public RuntimeException handleDatabaseException(String contextMessage,
final RuntimeException dbe)
{
- boolean noMajority = dbe instanceof InsufficientReplicasException ||
dbe instanceof InsufficientAcksException;
-
- if (noMajority)
+ if (dbe instanceof StoreException || dbe instanceof
ConnectionScopedRuntimeException)
{
- ReplicationGroupListener listener =
_replicationGroupListener.get();
- if (listener != null)
+ return dbe;
+ }
+ else if (dbe instanceof DatabaseException)
+ {
+ boolean noMajority = dbe instanceof InsufficientReplicasException
|| dbe instanceof InsufficientAcksException;
+
+ if (noMajority)
+ {
+ ReplicationGroupListener listener =
_replicationGroupListener.get();
+ if (listener != null)
+ {
+ listener.onNoMajority();
+ }
+ }
+
+ boolean restart = (noMajority || dbe instanceof
RestartRequiredException);
+ if (restart)
{
- listener.onNoMajority();
+ tryToRestartEnvironment((DatabaseException)dbe);
+ return new ConnectionScopedRuntimeException(noMajority ?
"Required number of nodes not reachable" : "Underlying JE environment is being
restarted", dbe);
}
}
-
- boolean restart = (noMajority || dbe instanceof
RestartRequiredException);
- if (restart)
+ else
{
- tryToRestartEnvironment(dbe);
- throw new ConnectionScopedRuntimeException(noMajority ? "Required
number of nodes not reachable" : "Underlying JE environment is being
restarted", dbe);
+ if (dbe instanceof IllegalStateException && getFacadeState() ==
State.RESTARTING)
+ {
+ return new ConnectionScopedRuntimeException("Underlying JE
environment is being restarted", dbe);
+ }
}
- return dbe;
+ return new StoreException("Unexpected exception occurred in replicated
environment", dbe);
}
private void tryToRestartEnvironment(final DatabaseException dbe)
@@ -452,12 +467,12 @@ public class ReplicatedEnvironmentFacade
}
if (_state.get() != State.OPEN)
{
- throw new IllegalStateException("Environment facade is not in
opened state");
+ throw new ConnectionScopedRuntimeException("Environment facade is
not in opened state");
}
if (!_environment.isValid())
{
- throw new IllegalStateException("Environment is not valid");
+ throw new ConnectionScopedRuntimeException("Environment is not
valid");
}
Database cachedHandle = _cachedDatabases.get(name);
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
Mon Oct 13 10:52:27 2014
@@ -343,6 +343,7 @@ public class BDBHAVirtualHostNodeImpl ex
String nodeAddress = node.getHostName() + ":" + node.getPort();
if (!_permittedNodes.contains(nodeAddress))
{
+ getEventLogger().message(getGroupLogSubject(),
HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
shutdownOnIntruder(nodeAddress);
throw new IllegalStateException("Intruder node detected: "
+ nodeAddress);
}
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
Mon Oct 13 10:52:27 2014
@@ -283,7 +283,7 @@ public class BDBMessageStoreTest extends
catch (RuntimeException e)
{
assertEquals("Unexpected exception message", "Offset 15 is greater
than message size 10 for message id "
- + messageid_0_8 + "!", e.getMessage());
+ + messageid_0_8 + "!", e.getCause().getMessage());
}
// buffer is smaller then message size
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Mon Oct 13 10:52:27 2014
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.test.utils.PortHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -146,6 +147,24 @@ public class ReplicatedEnvironmentFacade
assertNotSame("Expecting a new handle after database closure",
handle1, handle3);
}
+ public void testOpenDatabaseWhenFacadeIsNotOpened() throws Exception
+ {
+ DatabaseConfig createIfAbsentDbConfig =
DatabaseConfig.DEFAULT.setAllowCreate(true);
+
+ EnvironmentFacade ef = createMaster();
+ ef.close();
+
+ try
+ {
+ ef.openDatabase("myDatabase", createIfAbsentDbConfig );
+ fail("Database open should fail");
+ }
+ catch(ConnectionScopedRuntimeException e)
+ {
+ assertEquals("Unexpected exception", "Environment facade is not in
opened state", e.getMessage());
+ }
+ }
+
public void testGetGroupName() throws Exception
{
assertEquals("Unexpected group name", TEST_GROUP_NAME,
createMaster().getGroupName());
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
Mon Oct 13 10:52:27 2014
@@ -50,6 +50,8 @@ public class BrokerMessages
public static final String STOPPED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stopped";
public static final String STATS_MSGS_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs";
public static final String LISTENING_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening";
+ public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive";
+ public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active";
public static final String MAX_MEMORY_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.max_memory";
public static final String PLATFORM_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform";
public static final String SHUTTING_DOWN_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "broker.shutting_down";
@@ -66,6 +68,8 @@ public class BrokerMessages
Logger.getLogger(STOPPED_LOG_HIERARCHY);
Logger.getLogger(STATS_MSGS_LOG_HIERARCHY);
Logger.getLogger(LISTENING_LOG_HIERARCHY);
+ Logger.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
+ Logger.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
Logger.getLogger(MAX_MEMORY_LOG_HIERARCHY);
Logger.getLogger(PLATFORM_LOG_HIERARCHY);
Logger.getLogger(SHUTTING_DOWN_LOG_HIERARCHY);
@@ -265,6 +269,70 @@ public class BrokerMessages
/**
* Log a Broker message of the Format:
+ * <pre>BRK-1015 : Message flow to disk inactive : Message memory use
{0,number,#}KB within threshold {1,number,#.##}KB</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number
param2)
+ {
+ String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage,
_currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1014 : Message flow to disk active : Message memory use
{0,number,#}KB exceeds threshold {1,number,#.##}KB</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage,
_currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
* <pre>BRK-1011 : Maximum Memory : {0,number} bytes</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
Mon Oct 13 10:52:27 2014
@@ -47,4 +47,9 @@ PLATFORM = BRK-1010 : Platform : JVM : {
# 0 Maximum Memory
MAX_MEMORY = BRK-1011 : Maximum Memory : {0,number} bytes
-MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1}
\ No newline at end of file
+MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1}
+
+# 0 - Total message size
+# 1 - Target memory size
+FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active : Message memory
use {0,number,#}KB exceeds threshold {1,number,#.##}KB
+FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message
memory use {0,number,#}KB within threshold {1,number,#.##}KB
\ No newline at end of file
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1631345&r1=1631344&r2=1631345&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
Mon Oct 13 10:52:27 2014
@@ -82,6 +82,9 @@ public class BrokerAdapter extends Abstr
private Timer _reportingTimer;
private final StatisticsCounter _messagesDelivered, _dataDelivered,
_messagesReceived, _dataReceived;
+ /** Flags used to control the reporting of flow to disk. Protected by this
*/
+ private boolean _totalMessageSizeExceedThresholdReported = false,
_totalMessageSizeWithinThresholdReported = true;
+
@ManagedAttributeField
private String _defaultVirtualHost;
@ManagedAttributeField
@@ -99,6 +102,7 @@ public class BrokerAdapter extends Abstr
@ManagedAttributeField
private String _confidentialConfigurationEncryptionProvider;
+
@ManagedObjectFactoryConstructor
public BrokerAdapter(Map<String, Object> attributes,
SystemConfig parent)
@@ -437,6 +441,19 @@ public class BrokerAdapter extends Abstr
}
}
+ if (totalSize > totalTarget &&
!_totalMessageSizeExceedThresholdReported)
+ {
+ _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize
/ 1024, totalTarget / 1024));
+ _totalMessageSizeExceedThresholdReported = true;
+ _totalMessageSizeWithinThresholdReported = false;
+ }
+ else if (totalSize <= totalTarget &&
!_totalMessageSizeWithinThresholdReported)
+ {
+
_eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024,
totalTarget / 1024));
+ _totalMessageSizeWithinThresholdReported = true;
+ _totalMessageSizeExceedThresholdReported = false;
+ }
+
for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet())
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]