Author: lquack
Date: Thu Mar 23 16:52:47 2017
New Revision: 1788287
URL: http://svn.apache.org/viewvc?rev=1788287&view=rev
Log:
QPID-7663: [Java Broker] Implement persistent AMQP 1.0 Link store
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml
- copied, changed from r1788233,
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
qpid/java/trunk/broker/pom.xml
qpid/java/trunk/pom.xml
qpid/java/trunk/systests/pom.xml
qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml Thu Mar 23
16:52:47 2017
@@ -31,7 +31,8 @@
</parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+ <name>Apache Qpid AMQP 1.0 BDB Link Store Plug-in</name>
<dependencies>
<dependency>
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
Thu Mar 23 16:52:47 2017
@@ -24,7 +24,6 @@ import static org.apache.qpid.server.sto
import java.util.Collection;
import java.util.HashSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.StringBinding;
@@ -44,22 +43,20 @@ import org.apache.qpid.server.model.Mode
import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
import org.apache.qpid.server.protocol.v1_0.LinkKey;
-import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
-public class BDBLinkStore implements LinkStore
+public class BDBLinkStore extends AbstractLinkStore
{
private static final Logger LOGGER =
LoggerFactory.getLogger(BDBLinkStore.class);
private static final String LINKS_DB_NAME = "AMQP_1_0_LINKS";
private static final String LINKS_VERSION_DB_NAME =
"AMQP_1_0_LINKS_VERSION";
- private final ReentrantReadWriteLock _useOrCloseRWLock = new
ReentrantReadWriteLock(true);
private final BDBEnvironmentContainer<?> _environmentContainer;
- private volatile StoreState _storeState = StoreState.CLOSED;
BDBLinkStore(final BDBEnvironmentContainer<?> environmentContainer)
{
@@ -67,36 +64,23 @@ public class BDBLinkStore implements Lin
}
@Override
- public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater
updater) throws StoreException
+ protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater
updater) throws StoreException
{
- _useOrCloseRWLock.readLock().lock();
try
{
- Collection<LinkDefinition> links = getLinkDefinitions(updater);
- _storeState = StoreState.OPENED;
- return links;
+ return getLinkDefinitions(updater);
}
catch (RuntimeException e)
{
throw getEnvironmentFacade().handleDatabaseException("Failed
recovery of links", e);
}
- finally
- {
- _useOrCloseRWLock.readLock().unlock();
- }
}
@Override
- public void saveLink(final LinkDefinition link)
+ protected void doSaveLink(final LinkDefinition link)
{
- _useOrCloseRWLock.readLock().lock();
try
{
- if (_storeState != StoreState.OPENED)
- {
- throw new StoreException("Store is not opened");
- }
-
Database linksDatabase =
getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
save(linksDatabase, null, link);
}
@@ -104,24 +88,14 @@ public class BDBLinkStore implements Lin
{
throw
getEnvironmentFacade().handleDatabaseException(String.format("Failed saving of
link '%s'", new LinkKey(link)), e);
}
- finally
- {
- _useOrCloseRWLock.readLock().unlock();
- }
}
@Override
- public void deleteLink(final LinkDefinition linkDefinition)
+ protected void doDeleteLink(final LinkDefinition linkDefinition)
{
LinkKey linkKey = new LinkKey(linkDefinition);
- _useOrCloseRWLock.readLock().lock();
try
{
- if (_storeState != StoreState.OPENED)
- {
- throw new StoreException("Store is not opened");
- }
-
Database linksDatabase =
getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
final DatabaseEntry databaseEntry = new DatabaseEntry();
@@ -136,34 +110,19 @@ public class BDBLinkStore implements Lin
{
throw
getEnvironmentFacade().handleDatabaseException(String.format("Failed deletion
of link '%s'", linkKey), e);
}
- finally
- {
- _useOrCloseRWLock.readLock().unlock();
- }
}
@Override
- public void close()
+ protected void doClose()
{
- _useOrCloseRWLock.writeLock().lock();
- try
- {
- _storeState = StoreState.CLOSED;
- }
- finally
- {
- _useOrCloseRWLock.writeLock().unlock();
- }
}
@Override
- public void delete()
+ protected void doDelete()
{
- _useOrCloseRWLock.writeLock().lock();
try
{
- close();
getEnvironmentFacade().deleteDatabase(LINKS_DB_NAME);
getEnvironmentFacade().deleteDatabase(LINKS_VERSION_DB_NAME);
}
@@ -172,10 +131,6 @@ public class BDBLinkStore implements Lin
getEnvironmentFacade().handleDatabaseException("Failed deletion of
database", e);
LOGGER.info("Failed to delete links database", e);
}
- finally
- {
- _useOrCloseRWLock.writeLock().unlock();
- }
}
@Override
@@ -229,6 +184,7 @@ public class BDBLinkStore implements Lin
{
save(linksDatabase, txn, link);
}
+ updateVersion(txn, currentVersion.toString());
txn.commit();
linksDatabase.close();
}
@@ -242,6 +198,16 @@ public class BDBLinkStore implements Lin
return links;
}
+ private void updateVersion(final Transaction txn, final String
currentVersion)
+ {
+ Database linksVersionDb =
getEnvironmentFacade().openDatabase(LINKS_VERSION_DB_NAME,
DEFAULT_DATABASE_CONFIG);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ StringBinding.stringToEntry(currentVersion, key);
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ linksVersionDb.put(txn, key, value);
+ }
+
private void save(Database database, Transaction txn, final LinkDefinition
link)
{
DatabaseEntry key = new DatabaseEntry();
@@ -297,19 +263,10 @@ public class BDBLinkStore implements Lin
}
catch (DatabaseNotFoundException e)
{
+ updateVersion(null, BrokerModel.MODEL_VERSION);
linksVersionDb =
getEnvironmentFacade().openDatabase(LINKS_VERSION_DB_NAME,
DEFAULT_DATABASE_CONFIG);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- StringBinding.stringToEntry(BrokerModel.MODEL_VERSION, key);
- LongBinding.longToEntry(System.currentTimeMillis(), value);
- linksVersionDb.put(null, key, value);
}
return linksVersionDb;
}
-
- enum StoreState
- {
- CLOSED, OPENED
- }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
Thu Mar 23 16:52:47 2017
@@ -23,11 +23,7 @@ import com.sleepycat.bind.tuple.TupleBin
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.store.StoreException;
@@ -36,9 +32,6 @@ public class LinkValueEntryBinding exten
{
private static final LinkValueEntryBinding INSTANCE = new
LinkValueEntryBinding();
- private AMQPDescribedTypeRegistry _describedTypeRegistry =
-
AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
-
private LinkValueEntryBinding()
{
}
@@ -84,33 +77,16 @@ public class LinkValueEntryBinding exten
byte[] bytes = new byte[size];
input.read(bytes);
- QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
- ValueHandler valueHandler = new ValueHandler(_describedTypeRegistry);
- Object object;
- try
- {
- object = valueHandler.parse(qpidByteBuffer);
- }
- catch (AmqpErrorException e)
- {
- throw new StoreException("Unexpected serialized data", e);
- }
- finally
- {
- qpidByteBuffer.dispose();
- }
- return object;
+ return LinkStoreUtils.amqpBytesToObject(bytes);
}
+
+
private void write(final Object object, final TupleOutput output)
{
- ValueWriter valueWriter =
_describedTypeRegistry.getValueWriter(object);
- int encodedSize = valueWriter.getEncodedSize();
- QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
- valueWriter.writeToBuffer(qpidByteBuffer);
-
- output.writeInt(encodedSize);
- output.write(qpidByteBuffer.array());
- qpidByteBuffer.dispose();
+ byte[] bytes = LinkStoreUtils.objectToAmqpBytes(object);
+ output.writeInt(bytes.length);
+ output.write(bytes);
}
+
}
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml (from
r1788233, qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml)
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml?p2=qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml&p1=qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml&r1=1788233&r2=1788287&rev=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml Thu Mar 23
16:52:47 2017
@@ -1,21 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
~
- ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
@@ -31,14 +32,10 @@
</parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
+ <name>Apache Qpid AMQP 1.0 JDBC Link Store Plug-in</name>
<dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-core</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -49,7 +46,7 @@
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-bdbstore</artifactId>
+ <artifactId>qpid-broker-plugins-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
@@ -59,12 +56,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.sleepycat</groupId>
- <artifactId>je</artifactId>
- <scope>provided</scope>
- </dependency>
-
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -80,5 +71,13 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
-</project>
+
+</project>
\ No newline at end of file
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CharStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ModelVersion;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.store.jdbc.JdbcUtils;
+
+public class JDBCLinkStore extends AbstractLinkStore
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JDBCLinkStore.class);
+ private static final String LINKS_TABLE_NAME_SUFFIX = "AMQP_1_0_LINKS";
+ private static final String VERSION_TABLE_NAME_SUFFIX =
"AMQP_1_0_LINKS_VERSION";
+ private final JDBCContainer _jdbcContainer;
+ private final String _tableNamePrefix;
+ private final String _sqlBlobType;
+ private final boolean _isUseBytesMethodsForBlob;
+
+ JDBCLinkStore(final JDBCContainer jdbcContainer)
+ {
+ _jdbcContainer = jdbcContainer;
+ _tableNamePrefix = jdbcContainer.getTableNamePrefix();
+ JDBCDetails jdbcDetails = jdbcContainer.getJDBCDetails();
+ _sqlBlobType = jdbcDetails.getBlobType();
+ _isUseBytesMethodsForBlob = jdbcDetails.isUseBytesMethodsForBlob();
+ }
+
+ @Override
+ protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater
updater) throws StoreException
+ {
+ Collection<LinkDefinition> linkDefinitions;
+ try
+ {
+ checkTransactionIsolationLevel();
+ createOrOpenStoreDatabase();
+ linkDefinitions = getLinks();
+ ModelVersion storedVersion = getStoredVersion();
+ ModelVersion currentVersion =
+ new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION,
BrokerModel.MODEL_MINOR_VERSION);
+ if (storedVersion.lessThan(currentVersion))
+ {
+ linkDefinitions = performUpdate(updater, linkDefinitions,
storedVersion, currentVersion);
+ }
+ else if (currentVersion.lessThan(storedVersion))
+ {
+ throw new StoreException(String.format("Cannot downgrade the
store from %s to %s",
+ storedVersion,
+ currentVersion));
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Cannot open link store", e);
+ }
+ return linkDefinitions;
+ }
+
+ @Override
+ protected void doClose() throws StoreException
+ {
+
+ }
+
+ @Override
+ protected void doSaveLink(final LinkDefinition link) throws StoreException
+ {
+ String linkKey = generateLinkKey(link);
+ Connection connection = getConnection();
+ try
+ {
+ connection.setAutoCommit(false);
+
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(
+ String.format(
+ "SELECT remote_container_id, link_name, link_role,
source, target FROM %s WHERE link_key = ?",
+ getLinksTableName())))
+ {
+ preparedStatement.setString(1, linkKey);
+ try (ResultSet resultSet = preparedStatement.executeQuery())
+ {
+ if (resultSet.next())
+ {
+ update(connection, linkKey, link);
+ }
+ else
+ {
+ insert(connection, linkKey, link);
+ }
+ }
+ }
+ connection.commit();
+ }
+ catch (SQLException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch (SQLException re)
+ {
+ LOGGER.debug("Rollback failed on rolling back saving link
transaction", re);
+ }
+ throw new StoreException(String.format("Cannot save link %s", new
LinkKey(link)), e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(connection, LOGGER);
+ }
+ }
+
+ @Override
+ protected void doDeleteLink(final LinkDefinition link) throws
StoreException
+ {
+
+ try (Connection connection = getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ String.format("DELETE FROM %s WHERE link_key = ?",
getLinksTableName())))
+ {
+ preparedStatement.setString(1, generateLinkKey(link));
+ preparedStatement.execute();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(String.format("Cannot delete link %s",
new LinkKey(link)), e);
+ }
+ }
+
+
+ @Override
+ protected void doDelete()
+ {
+ try (Connection connection = getConnection();
+ Statement dropLinksStatement = connection.createStatement();
+ Statement dropVersionsStatement = connection.createStatement())
+ {
+ dropLinksStatement.execute(String.format("DROP TABLE %s",
getLinksTableName()));
+ dropVersionsStatement.execute(String.format("DROP TABLE %s",
getVersionTableName()));
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error deleting Link store", e);
+ }
+ }
+
+ @Override
+ public TerminusDurability getHighestSupportedTerminusDurability()
+ {
+ return TerminusDurability.CONFIGURATION;
+ }
+
+ private void checkTransactionIsolationLevel() throws SQLException
+ {
+ try (Connection connection = getConnection())
+ {
+ DatabaseMetaData metaData = connection.getMetaData();
+ if
(!metaData.supportsTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE))
+ {
+ throw new StoreException(String.format(
+ "The RDBMS '%s' does not support required transaction
isolation level 'serializable'",
+ metaData.getDatabaseProductName()));
+ }
+ }
+ }
+
+ private Connection getConnection()
+ {
+ return _jdbcContainer.getConnection();
+ }
+
+ private void createOrOpenStoreDatabase() throws SQLException
+ {
+ try (Connection conn = getConnection())
+ {
+ conn.setAutoCommit(true);
+
+ createLinksTable(conn);
+ createVersionTable(conn);
+ }
+ }
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ String versionTableName = getVersionTableName();
+ if (!JdbcUtils.tableExists(versionTableName, conn))
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ stmt.execute(String.format("CREATE TABLE %s"
+ + " (version varchar(10) PRIMARY
KEY ,"
+ + " version_time TIMESTAMP)",
versionTableName));
+ }
+ updateVersion(conn,
ModelVersion.fromString(BrokerModel.MODEL_VERSION));
+ }
+ }
+
+
+ private void createLinksTable(final Connection conn) throws SQLException
+ {
+ if (!JdbcUtils.tableExists(getLinksTableName(), conn))
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ stmt.execute(String.format("CREATE TABLE %1$s"
+ + " ( link_key varchar(44) PRIMARY
KEY ,"
+ + " remote_container_id %2$s, "
+ + " link_name %2$s,"
+ + " link_role INTEGER,"
+ + " source %2$s,"
+ + " target %2$s )",
getLinksTableName(), _sqlBlobType));
+ }
+ }
+ }
+
+ private String getLinksTableName()
+ {
+ return _tableNamePrefix + LINKS_TABLE_NAME_SUFFIX;
+ }
+
+ private String getVersionTableName()
+ {
+ return _tableNamePrefix + VERSION_TABLE_NAME_SUFFIX;
+ }
+
+ private Collection<LinkDefinition> performUpdate(final LinkStoreUpdater
updater,
+
Collection<LinkDefinition> linkDefinitions,
+ final ModelVersion
storedVersion,
+ final ModelVersion
currentVersion) throws SQLException
+ {
+ linkDefinitions = updater.update(storedVersion.toString(),
linkDefinitions);
+ Connection connection = getConnection();
+ try
+ {
+ connection.setAutoCommit(false);
+
+ try (Statement statement = connection.createStatement())
+ {
+ statement.execute("DELETE FROM " + getLinksTableName());
+ }
+
+ for (LinkDefinition linkDefinition : linkDefinitions)
+ {
+ insert(connection, generateLinkKey(linkDefinition),
linkDefinition);
+ }
+ updateVersion(connection, currentVersion);
+ connection.commit();
+ }
+ catch (SQLException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch (SQLException re)
+ {
+ LOGGER.debug("Cannot rollback transaction", re);
+ }
+ throw e;
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(connection, LOGGER);
+ }
+ return linkDefinitions;
+ }
+
+ private Collection<LinkDefinition> getLinks() throws SQLException
+ {
+ Collection<LinkDefinition> links = new ArrayList<>();
+ try (Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(String.format(
+ "SELECT remote_container_id, link_name, link_role,
source, target FROM %s",
+ getLinksTableName())))
+ {
+ while (resultSet.next())
+ {
+ String remoteContainerId = getBlobValueAsString(resultSet, 1);
+ String linkName = getBlobValueAsString(resultSet, 2);
+ Role role = Role.valueOf(resultSet.getBoolean(3));
+ Source source = (Source) getBlobAsAmqpObject(resultSet, 4);
+ Target target = (Target) getBlobAsAmqpObject(resultSet, 5);
+
+ links.add(new LinkDefinitionImpl(remoteContainerId, linkName,
role, source, target));
+ }
+ }
+ return links;
+ }
+
+ private Object getBlobAsAmqpObject(final ResultSet resultSet, final int
index) throws SQLException
+ {
+ byte[] sourceBytes;
+ if (_isUseBytesMethodsForBlob)
+ {
+ sourceBytes = resultSet.getBytes(index);
+ }
+ else
+ {
+ Blob blob = resultSet.getBlob(index);
+ try (InputStream is = blob.getBinaryStream())
+ {
+ sourceBytes = ByteStreams.toByteArray(is);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Cannot convert blob to string", e);
+ }
+ finally
+ {
+ blob.free();
+ }
+ }
+ return LinkStoreUtils.amqpBytesToObject(sourceBytes);
+ }
+
+ private String getBlobValueAsString(final ResultSet resultSet, final int
index) throws SQLException
+ {
+ if (_isUseBytesMethodsForBlob)
+ {
+ return new String(resultSet.getBytes(index), UTF_8);
+ }
+
+ Blob blob = resultSet.getBlob(index);
+ try (InputStream is = blob.getBinaryStream();
+ InputStreamReader isr = new InputStreamReader(is, UTF_8))
+ {
+ return CharStreams.toString(isr);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Cannot convert blob to string", e);
+ }
+ finally
+ {
+ blob.free();
+ }
+ }
+
+ private ModelVersion getStoredVersion() throws SQLException
+ {
+ ModelVersion version = null;
+ try (Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(String.format("SELECT version FROM %s",
+
getVersionTableName())))
+ {
+ while (resultSet.next())
+ {
+ ModelVersion storedVersion =
ModelVersion.fromString(resultSet.getString(1));
+ if (version == null || version.lessThan(storedVersion))
+ {
+ version = storedVersion;
+ }
+ }
+ }
+ if (version == null)
+ {
+ throw new StoreException("Version of links is not found");
+ }
+ return version;
+ }
+
+ private void updateVersion(final Connection connection, final ModelVersion
currentVersion) throws SQLException
+ {
+ String version = currentVersion.toString();
+ try (PreparedStatement statement =
connection.prepareStatement(String.format(
+ "INSERT INTO %s (version, version_time) VALUES (?,?)",
+ getVersionTableName())))
+ {
+ statement.setString(1, version);
+ statement.setDate(2, new
java.sql.Date(System.currentTimeMillis()));
+ if (statement.executeUpdate() != 1)
+ {
+ throw new StoreException(String.format("Cannot insert version
'%s' into version table", version));
+ }
+ }
+ }
+
+
+ private void insert(final Connection connection, final String linkKey,
final LinkDefinition linkDefinition)
+ throws SQLException
+ {
+ try (PreparedStatement statement =
connection.prepareStatement(String.format(
+ "INSERT INTO %s (link_key, remote_container_id, link_name,
link_role, source, target) VALUES (?,?,?,?,?,?)",
+ getLinksTableName())))
+ {
+ statement.setString(1, linkKey);
+ saveStringAsBlob(statement, 2,
linkDefinition.getRemoteContainerId());
+ saveStringAsBlob(statement, 3, linkDefinition.getName());
+ statement.setInt(4, linkDefinition.getRole().getValue() ? 1 : 0);
+ saveObjectAsBlob(statement, 5, linkDefinition.getSource());
+ saveObjectAsBlob(statement, 6, linkDefinition.getTarget());
+ if (statement.executeUpdate() != 1)
+ {
+ throw new StoreException(String.format("Cannot save link %s",
new LinkKey(linkDefinition)));
+ }
+ }
+ }
+
+ private void update(final Connection connection, final String linkKey,
final LinkDefinition linkDefinition)
+ throws SQLException
+ {
+ try (PreparedStatement statement =
connection.prepareStatement(String.format(
+ "UPDATE %s SET source = ?, target = ? WHERE link_key = ?",
+ getLinksTableName())))
+ {
+ saveObjectAsBlob(statement, 1, linkDefinition.getSource());
+ saveObjectAsBlob(statement, 2, linkDefinition.getTarget());
+ statement.setString(3, linkKey);
+ if (statement.executeUpdate() != 1)
+ {
+ throw new StoreException(String.format("Cannot save link %s",
new LinkKey(linkDefinition)));
+ }
+ }
+ }
+
+ private void saveObjectAsBlob(final PreparedStatement statement, final int
index, final Object object)
+ throws SQLException
+ {
+ saveBytesAsBlob(statement, index,
LinkStoreUtils.objectToAmqpBytes(object));
+ }
+
+ private void saveBytesAsBlob(final PreparedStatement statement, final int
index, final byte[] bytes)
+ throws SQLException
+ {
+ if (_isUseBytesMethodsForBlob)
+ {
+ statement.setBytes(index, bytes);
+ }
+ else
+ {
+ try (InputStream inputStream = new ByteArrayInputStream(bytes))
+ {
+ statement.setBlob(index, inputStream);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Cannot save link", e);
+ }
+ }
+ }
+
+ private void saveStringAsBlob(final PreparedStatement statement, final int
index, final String value)
+ throws SQLException
+ {
+ saveBytesAsBlob(statement, index, value.getBytes(UTF_8));
+ }
+
+ private String generateLinkKey(final LinkDefinition linkDefinition)
+ {
+ MessageDigest md;
+ try
+ {
+ md = MessageDigest.getInstance("SHA-256");
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new StoreException("Cannot generate SHA-256 checksum", e);
+ }
+
+ md.update(linkDefinition.getRemoteContainerId().getBytes(UTF_8));
+ md.update(linkDefinition.getName().getBytes(UTF_8));
+ md.update(linkDefinition.getRole().getValue() ? (byte) 1 : (byte) 0);
+
+ return Base64.getEncoder().encodeToString(md.digest());
+ }
+}
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class JDBCLinkStoreFactory implements LinkStoreFactory
+{
+ public static final String TYPE = "JDBC";
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public LinkStore create(final NamedAddressSpace addressSpace)
+ {
+ JDBCContainer jdbcContainer = null;
+ if (addressSpace instanceof JDBCContainer)
+ {
+ jdbcContainer = (JDBCContainer) addressSpace;
+ }
+ else if (addressSpace instanceof VirtualHost
+ && ((VirtualHost) addressSpace).getParent() instanceof
JDBCContainer)
+ {
+ jdbcContainer = (JDBCContainer) ((VirtualHost)
addressSpace).getParent();
+ }
+ else
+ {
+ throw new StoreException(String.format("Named address space '%s'
is not support by link store of type '%s'",
+ addressSpace.getName(),
+ TYPE));
+ }
+
+ return new JDBCLinkStore(jdbcContainer);
+ }
+
+ @Override
+ public boolean supports(final NamedAddressSpace addressSpace)
+ {
+ return (addressSpace instanceof JDBCContainer
+ || (addressSpace instanceof VirtualHost
+ && ((VirtualHost) addressSpace).getParent() instanceof
JDBCContainer));
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return 100;
+ }
+}
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreTestCase;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+
+public class JDBCLinkStoreTest extends LinkStoreTestCase
+{
+ @Override
+ protected LinkStore createLinkStore()
+ {
+ final JDBCDetails details = mock(JDBCDetails.class);
+ when(details.getBlobType()).thenReturn("blob");
+ when(details.isUseBytesMethodsForBlob()).thenReturn(false);
+
+ JDBCContainer jdbcContainer = mock(JDBCContainer.class);
+ when(jdbcContainer.getJDBCDetails()).thenReturn(details);
+ when(jdbcContainer.getTableNamePrefix()).thenReturn("testTablePrefix");
+ when(jdbcContainer.getConnection()).thenAnswer(invocation ->
DriverManager.getConnection(getUrl() + ";create=true"));
+
+ return new JDBCLinkStore(jdbcContainer);
+ }
+
+ @Override
+ protected void deleteLinkStore()
+ {
+ Connection connection = null;
+ try
+ {
+ connection = DriverManager.getConnection(getUrl());
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equalsIgnoreCase("08006"))
+ {
+ //expected and represents a clean shutdown of this database
only, do nothing.
+ }
+ else
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private String getUrl()
+ {
+ return String.format("jdbc:derby:memory:/%s", getTestName());
+ }
+}
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store;
+
+import java.util.Collection;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.store.StoreException;
+
+public abstract class AbstractLinkStore implements LinkStore
+{
+ private final ReentrantReadWriteLock _useOrCloseRWLock = new
ReentrantReadWriteLock(true);
+ private volatile StoreState _storeState = StoreState.CLOSED;
+
+ protected abstract Collection<LinkDefinition> doOpenAndLoad(final
LinkStoreUpdater updater);
+ protected abstract void doClose();
+ protected abstract void doDelete();
+ protected abstract void doSaveLink(final LinkDefinition link);
+ protected abstract void doDeleteLink(final LinkDefinition link);
+
+ @Override
+ public final Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater
updater) throws StoreException
+ {
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ if (_storeState != StoreState.CLOSED)
+ {
+ throw new StoreException("Store is already opened");
+ }
+
+ Collection<LinkDefinition> linkDefinitions =
doOpenAndLoad(updater);
+ _storeState = StoreState.OPENED;
+ return linkDefinitions;
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public final void close() throws StoreException
+ {
+ _useOrCloseRWLock.writeLock().lock();
+ try
+ {
+ doClose();
+ _storeState = StoreState.CLOSED;
+ }
+ finally
+ {
+ _useOrCloseRWLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public final void saveLink(final LinkDefinition link) throws StoreException
+ {
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ if (_storeState != StoreState.OPENED)
+ {
+ throw new StoreException("Store is not opened");
+ }
+
+ doSaveLink(link);
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public final void deleteLink(final LinkDefinition link) throws
StoreException
+ {
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ if (_storeState != StoreState.OPENED)
+ {
+ throw new StoreException("Store is not opened");
+ }
+
+ doDeleteLink(link);
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public final void delete()
+ {
+ _useOrCloseRWLock.writeLock().lock();
+ try
+ {
+ close();
+ doDelete();
+ }
+ finally
+ {
+ _useOrCloseRWLock.writeLock().unlock();
+ }
+ }
+
+ enum StoreState
+ {
+ CLOSED, OPENED
+ }
+}
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.store.StoreException;
+
+public class LinkStoreUtils
+{
+ private static AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY =
+
AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
+
+ public static Object amqpBytesToObject(final byte[] bytes)
+ {
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
+ ValueHandler valueHandler = new ValueHandler(DESCRIBED_TYPE_REGISTRY);
+ Object object;
+ try
+ {
+ object = valueHandler.parse(qpidByteBuffer);
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new StoreException("Unexpected serialized data", e);
+ }
+ finally
+ {
+ qpidByteBuffer.dispose();
+ }
+ return object;
+ }
+
+ public static byte[] objectToAmqpBytes(final Object object)
+ {
+ ValueWriter valueWriter =
DESCRIBED_TYPE_REGISTRY.getValueWriter(object);
+ int encodedSize = valueWriter.getEncodedSize();
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
+ valueWriter.writeToBuffer(qpidByteBuffer);
+
+ byte[] bytes = qpidByteBuffer.array();
+ qpidByteBuffer.dispose();
+ return bytes;
+ }
+}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
Thu Mar 23 16:52:47 2017
@@ -70,7 +70,8 @@ public abstract class LinkStoreTestCase
_source.setTimeout(new UnsignedInteger(1));
_target.setTimeout(new UnsignedInteger(2));
-
_target.setDynamicNodeProperties(Collections.singletonMap("targetDynamicProperty",
"targetDynamicPropertyValue"));
+
_target.setDynamicNodeProperties(Collections.singletonMap("targetDynamicProperty",
+
"targetDynamicPropertyValue"));
_target.setDynamic(Boolean.TRUE);
_target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
_target.setAddress("bar");
@@ -87,9 +88,19 @@ public abstract class LinkStoreTestCase
public void testOpenAndLoad() throws Exception
{
- Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
+ Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
assertTrue("Unexpected links", links.isEmpty());
+ try
+ {
+ _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+ fail("Repeated open of already opened store should fail");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+
LinkDefinition linkDefinition = createLinkDefinition("1", "test");
_linkStore.saveLink(linkDefinition);
_linkStore.close();
@@ -99,7 +110,6 @@ public abstract class LinkStoreTestCase
}
-
public void testClose() throws Exception
{
_linkStore.openAndLoad(new LinkStoreUpdaterImpl());
@@ -111,7 +121,7 @@ public abstract class LinkStoreTestCase
_linkStore.saveLink(linkDefinition);
fail("Saving link with close store should fail");
}
- catch(StoreException e)
+ catch (StoreException e)
{
// pass
}
@@ -119,11 +129,22 @@ public abstract class LinkStoreTestCase
public void testSaveLink() throws Exception
{
- _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+
LinkDefinition linkDefinition = createLinkDefinition("1", "test");
+ _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
_linkStore.saveLink(linkDefinition);
_linkStore.close();
+ try
+ {
+ _linkStore.saveLink(createLinkDefinition("2", "test2"));
+ fail("Save on unopened database should fail");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+
Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
assertEquals("Unexpected link number", 1, links.size());
@@ -149,6 +170,16 @@ public abstract class LinkStoreTestCase
_linkStore.deleteLink(linkDefinition2);
_linkStore.close();
+ try
+ {
+ _linkStore.deleteLink(linkDefinition);
+ fail("Delete on unopened database should fail");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+
Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
assertEquals("Unexpected link number", 1, links.size());
@@ -171,9 +202,13 @@ public abstract class LinkStoreTestCase
LinkDefinition linkDefinition2 = createLinkDefinition("2", "test2");
_linkStore.saveLink(linkDefinition2);
+ _linkStore.close();
+ Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
+ assertEquals("Unexpected link number", 2, links.size());
+
_linkStore.delete();
- Collection<LinkDefinition> links = _linkStore.openAndLoad(new
LinkStoreUpdaterImpl());
+ links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
assertEquals("Unexpected link number", 0, links.size());
}
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
Thu Mar 23 16:52:47 2017
@@ -90,7 +90,7 @@ public class DerbyConfigurationStore ext
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
return DriverManager.getConnection(_connectionURL);
}
@@ -206,7 +206,7 @@ public class DerbyConfigurationStore ext
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
checkMessageStoreOpen();
return DerbyConfigurationStore.this.getConnection();
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
Thu Mar 23 16:52:47 2017
@@ -52,7 +52,7 @@ public class DerbyMessageStore extends A
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
checkMessageStoreOpen();
return DriverManager.getConnection(_connectionURL);
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost.derby;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Map;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -29,10 +31,14 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.server.store.derby.DerbyUtils;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FileHelper;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-public class DerbyVirtualHostImpl extends
AbstractVirtualHost<DerbyVirtualHostImpl> implements
DerbyVirtualHost<DerbyVirtualHostImpl>
+public class DerbyVirtualHostImpl extends
AbstractVirtualHost<DerbyVirtualHostImpl>
+ implements DerbyVirtualHost<DerbyVirtualHostImpl>, JDBCContainer
{
public static final String VIRTUAL_HOST_TYPE = "DERBY";
@@ -90,4 +96,30 @@ public class DerbyVirtualHostImpl extend
throw new IllegalConfigurationException("The store path is not
writable directory");
}
}
+
+ @Override
+ public JDBCDetails getJDBCDetails()
+ {
+ return JDBCDetails.getJdbcDetails("derby", this);
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ try
+ {
+ return ((DerbyMessageStore) getMessageStore()).getConnection();
+ }
+ catch (SQLException e)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Error opening connection to database for VirtualHost
'%s'", getName()));
+ }
+ }
+
+ @Override
+ public String getTableNamePrefix()
+ {
+ return "";
+ }
}
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
Thu Mar 23 16:52:47 2017
@@ -21,6 +21,8 @@
package org.apache.qpid.server.virtualhostnode.derby;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -35,14 +37,18 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.derby.DerbyConfigurationStore;
import org.apache.qpid.server.store.derby.DerbyUtils;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
import org.apache.qpid.server.store.preferences.PreferenceStore;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FileHelper;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject( category = false,
type = DerbyVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE,
validChildTypes =
"org.apache.qpid.server.virtualhostnode.derby.DerbyVirtualHostNodeImpl#getSupportedChildTypes()"
)
-public class DerbyVirtualHostNodeImpl extends
AbstractStandardVirtualHostNode<DerbyVirtualHostNodeImpl> implements
DerbyVirtualHostNode<DerbyVirtualHostNodeImpl>
+public class DerbyVirtualHostNodeImpl extends
AbstractStandardVirtualHostNode<DerbyVirtualHostNodeImpl>
+ implements DerbyVirtualHostNode<DerbyVirtualHostNodeImpl>,
JDBCContainer
{
public static final String VIRTUAL_HOST_NODE_TYPE = "DERBY";
@@ -104,4 +110,31 @@ public class DerbyVirtualHostNodeImpl ex
{
return
((DerbyConfigurationStore)getConfigurationStore()).getPreferenceStore();
}
+
+ @Override
+ public JDBCDetails getJDBCDetails()
+ {
+ return JDBCDetails.getJdbcDetails("derby", this);
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ try
+ {
+ return ((DerbyConfigurationStore)
getConfigurationStore()).getConnection();
+ }
+ catch (SQLException e)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Error opening connection to database for VirtualHostNode
'%s'",
+ getName()));
+ }
+ }
+
+ @Override
+ public String getTableNamePrefix()
+ {
+ return "";
+ }
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
Thu Mar 23 16:52:47 2017
@@ -533,7 +533,7 @@ public abstract class AbstractJDBCMessag
return connection;
}
- protected abstract Connection getConnection() throws SQLException;
+ public abstract Connection getConnection() throws SQLException;
@Override
public Transaction newTransaction()
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
Thu Mar 23 16:52:47 2017
@@ -145,7 +145,7 @@ public class GenericJDBCConfigurationSto
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
return _connectionProvider.getConnection();
}
@@ -250,7 +250,7 @@ public class GenericJDBCConfigurationSto
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
return GenericJDBCConfigurationStore.this.getConnection();
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
Thu Mar 23 16:52:47 2017
@@ -118,7 +118,7 @@ public class GenericJDBCMessageStore ext
}
@Override
- protected Connection getConnection() throws SQLException
+ public Connection getConnection() throws SQLException
{
return _connectionProvider.getConnection();
}
Added:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java?rev=1788287&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
(added)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
Thu Mar 23 16:52:47 2017
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.jdbc;
+
+import java.sql.Connection;
+
+public interface JDBCContainer
+{
+ JDBCDetails getJDBCDetails();
+
+ Connection getConnection();
+
+ String getTableNamePrefix();
+}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
Thu Mar 23 16:52:47 2017
@@ -231,6 +231,19 @@ public abstract class JDBCDetails
public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final
ConfiguredObject<?> object)
{
+ String[] components = jdbcUrl.split(":", 3);
+ final JDBCDetails details;
+ String vendor = null;
+ if(components.length >= 2)
+ {
+ vendor = components[1];
+ }
+
+ return getJdbcDetails(vendor, object);
+ }
+
+ public static JDBCDetails getJdbcDetails(final String vendor, final
ConfiguredObject<?> object)
+ {
final Set<String> contextKeys = object.getContextKeys(false);
Map<String,String> mapConversion = new AbstractMap<String, String>()
{
@@ -296,15 +309,14 @@ public abstract class JDBCDetails
};
}
};
- return getDetailsForJdbcUrl(jdbcUrl, mapConversion);
+ return getJdbcDetails(vendor, mapConversion);
}
- public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final
Map<String, String> contextMap)
+
+ static JDBCDetails getJdbcDetails(final String vendor, final Map<String,
String> contextMap)
{
- String[] components = jdbcUrl.split(":", 3);
final JDBCDetails details;
- if(components.length >= 2)
+ if (vendor != null)
{
- String vendor = components[1];
if (KnownJDBCDetails.VENDOR_DETAILS.containsKey(vendor))
{
details = KnownJDBCDetails.VENDOR_DETAILS.get(vendor);
@@ -319,7 +331,6 @@ public abstract class JDBCDetails
details = KnownJDBCDetails.FALLBACK;
}
-
return new JDBCDetails()
{
@Override
@@ -371,7 +382,5 @@ public abstract class JDBCDetails
|| contextMap.containsKey(CONTEXT_JDBCSTORE_BLOBTYPE);
}
};
-
}
-
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost.jdbc;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Map;
import org.apache.qpid.server.model.ManagedAttributeField;
@@ -27,11 +29,16 @@ import org.apache.qpid.server.model.Mana
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.jdbc.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@ManagedObject(category = false, type = JDBCVirtualHostImpl.VIRTUAL_HOST_TYPE)
-public class JDBCVirtualHostImpl extends
AbstractVirtualHost<JDBCVirtualHostImpl> implements
JDBCVirtualHost<JDBCVirtualHostImpl>
+public class JDBCVirtualHostImpl extends
AbstractVirtualHost<JDBCVirtualHostImpl>
+ implements JDBCVirtualHost<JDBCVirtualHostImpl>, JDBCContainer
{
public static final String VIRTUAL_HOST_TYPE = "JDBC";
@@ -94,6 +101,27 @@ public class JDBCVirtualHostImpl extends
}
@Override
+ public JDBCDetails getJDBCDetails()
+ {
+ return JDBCDetails.getDetailsForJdbcUrl(getConnectionUrl(), this);
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ try
+ {
+ return ((AbstractJDBCMessageStore)
getMessageStore()).getConnection();
+ }
+ catch (SQLException e)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Error opening connection to database for VirtualHost
'%s'",
+ getName()));
+ }
+ }
+
+ @Override
public String toString()
{
return getClass().getSimpleName() + " [id=" + getId() + ", name=" +
getName() +
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhostnode.jdbc;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -31,12 +33,16 @@ import org.apache.qpid.server.model.Mana
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
import org.apache.qpid.server.store.preferences.PreferenceStore;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category
= false ,
validChildTypes =
"org.apache.qpid.server.virtualhostnode.jdbc.JDBCVirtualHostNodeImpl#getSupportedChildTypes()")
-public class JDBCVirtualHostNodeImpl extends
AbstractStandardVirtualHostNode<JDBCVirtualHostNodeImpl> implements
JDBCVirtualHostNode<JDBCVirtualHostNodeImpl>
+public class JDBCVirtualHostNodeImpl extends
AbstractStandardVirtualHostNode<JDBCVirtualHostNodeImpl>
+ implements JDBCVirtualHostNode<JDBCVirtualHostNodeImpl>, JDBCContainer
{
public static final String VIRTUAL_HOST_NODE_TYPE = "JDBC";
@@ -103,6 +109,27 @@ public class JDBCVirtualHostNodeImpl ext
}
@Override
+ public JDBCDetails getJDBCDetails()
+ {
+ return JDBCDetails.getDetailsForJdbcUrl(getConnectionUrl(), this);
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ try
+ {
+ return ((GenericJDBCConfigurationStore)
getConfigurationStore()).getConnection();
+ }
+ catch (SQLException e)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Error opening connection to database for VirtualHostNode
'%s'",
+ getName()));
+ }
+ }
+
+ @Override
public String toString()
{
return getClass().getSimpleName() + " [id=" + getId() + ", name=" +
getName() +
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
Thu Mar 23 16:52:47 2017
@@ -29,8 +29,7 @@ public class JDBCDetailsTest extends Qpi
{
public void testDerby()
{
- JDBCDetails derbyDetails =
JDBCDetails.getDetailsForJdbcUrl("jdbc:derby:sample",
-
Collections.<String, String>emptyMap());
+ JDBCDetails derbyDetails = JDBCDetails.getJdbcDetails("derby",
Collections.emptyMap());
assertEquals("derby", derbyDetails.getVendor());
assertEquals("varchar(%d) for bit data",
derbyDetails.getVarBinaryType());
assertEquals("bigint", derbyDetails.getBigintType());
@@ -43,7 +42,7 @@ public class JDBCDetailsTest extends Qpi
public void testUnknownVendor_UsesFallbackDetails()
{
- JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl("jdbc:homedb:",
Collections.<String, String>emptyMap());
+ JDBCDetails details = JDBCDetails.getJdbcDetails("homedb",
Collections.emptyMap());
assertEquals("fallback", details.getVendor());
assertEquals("varchar(%d) for bit data", details.getVarBinaryType());
assertEquals("bigint", details.getBigintType());
@@ -59,7 +58,7 @@ public class JDBCDetailsTest extends Qpi
Map<String, String> contextMap = new HashMap<>();
contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_VARBINARYTYPE,
"myvarbin");
- JDBCDetails derbyDetails =
JDBCDetails.getDetailsForJdbcUrl("jdbc:derby:sample", contextMap);
+ JDBCDetails derbyDetails = JDBCDetails.getJdbcDetails("derby",
contextMap);
assertEquals("derby", derbyDetails.getVendor());
assertEquals("myvarbin", derbyDetails.getVarBinaryType());
assertEquals("bigint", derbyDetails.getBigintType());
@@ -81,7 +80,7 @@ public class JDBCDetailsTest extends Qpi
contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_BLOBTYPE, "myblob");
contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_USEBYTESFORBLOB, "true");
- JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl("jdbc:sybase:",
contextMap);
+ JDBCDetails details = JDBCDetails.getJdbcDetails("sybase", contextMap);
assertEquals("sybase", details.getVendor());
assertEquals("myvarbin", details.getVarBinaryType());
assertEquals("mybigint", details.getBigintType());
Modified: qpid/java/trunk/broker/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker/pom.xml (original)
+++ qpid/java/trunk/broker/pom.xml Thu Mar 23 16:52:47 2017
@@ -127,6 +127,13 @@
<dependency>
<groupId>org.apache.qpid</groupId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-management-http</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
@@ -164,7 +171,7 @@
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
<optional>true</optional>
Modified: qpid/java/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Thu Mar 23 16:52:47 2017
@@ -178,6 +178,7 @@
<module>broker-plugins/memory-store</module>
<module>broker-plugins/websocket</module>
<module>broker-plugins/amqp-1-0-bdb-store</module>
+ <module>broker-plugins/amqp-1-0-jdbc-store</module>
<module>tools</module>
<module>qpid-systests-parent</module>
Modified: qpid/java/trunk/systests/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/systests/pom.xml (original)
+++ qpid/java/trunk/systests/pom.xml Thu Mar 23 16:52:47 2017
@@ -137,11 +137,18 @@
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.qpid</groupId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+ <version>${project.version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
<version>${project.version}</version>
Modified: qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml (original)
+++ qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml Thu Mar 23 16:52:47
2017
@@ -75,8 +75,9 @@
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
<version>${project.version}</version>
+ <optional>true</optional>
</dependency>
</dependencies>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]