Author: lquack
Date: Thu Mar 23 16:52:47 2017
New Revision: 1788287

URL: http://svn.apache.org/viewvc?rev=1788287&view=rev
Log:
QPID-7663: [Java Broker] Implement persistent AMQP 1.0 Link store

Added:
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml
      - copied, changed from r1788233, 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/
    qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/
    
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
    
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
    
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
    
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
    
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
    
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
    qpid/java/trunk/broker/pom.xml
    qpid/java/trunk/pom.xml
    qpid/java/trunk/systests/pom.xml
    qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml Thu Mar 23 
16:52:47 2017
@@ -31,7 +31,8 @@
     </parent>
 
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+    
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+    <name>Apache Qpid AMQP 1.0 BDB Link Store Plug-in</name>
 
     <dependencies>
         <dependency>

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
 Thu Mar 23 16:52:47 2017
@@ -24,7 +24,6 @@ import static org.apache.qpid.server.sto
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.sleepycat.bind.tuple.LongBinding;
 import com.sleepycat.bind.tuple.StringBinding;
@@ -44,22 +43,20 @@ import org.apache.qpid.server.model.Mode
 import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
 import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
 import org.apache.qpid.server.protocol.v1_0.LinkKey;
-import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
 import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 
-public class BDBLinkStore implements LinkStore
+public class BDBLinkStore extends AbstractLinkStore
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BDBLinkStore.class);
     private static final String LINKS_DB_NAME = "AMQP_1_0_LINKS";
     private static final String LINKS_VERSION_DB_NAME = 
"AMQP_1_0_LINKS_VERSION";
 
-    private final ReentrantReadWriteLock _useOrCloseRWLock = new 
ReentrantReadWriteLock(true);
     private final BDBEnvironmentContainer<?> _environmentContainer;
-    private volatile StoreState _storeState = StoreState.CLOSED;
 
     BDBLinkStore(final BDBEnvironmentContainer<?> environmentContainer)
     {
@@ -67,36 +64,23 @@ public class BDBLinkStore implements Lin
     }
 
     @Override
-    public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater 
updater) throws StoreException
     {
-        _useOrCloseRWLock.readLock().lock();
         try
         {
-            Collection<LinkDefinition> links = getLinkDefinitions(updater);
-            _storeState = StoreState.OPENED;
-            return links;
+            return getLinkDefinitions(updater);
         }
         catch (RuntimeException e)
         {
             throw getEnvironmentFacade().handleDatabaseException("Failed 
recovery of links", e);
         }
-        finally
-        {
-            _useOrCloseRWLock.readLock().unlock();
-        }
     }
 
     @Override
-    public void saveLink(final LinkDefinition link)
+    protected void doSaveLink(final LinkDefinition link)
     {
-        _useOrCloseRWLock.readLock().lock();
         try
         {
-            if (_storeState != StoreState.OPENED)
-            {
-                throw new StoreException("Store is not opened");
-            }
-
             Database linksDatabase = 
getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
             save(linksDatabase, null, link);
         }
@@ -104,24 +88,14 @@ public class BDBLinkStore implements Lin
         {
             throw 
getEnvironmentFacade().handleDatabaseException(String.format("Failed saving of 
link '%s'", new LinkKey(link)), e);
         }
-        finally
-        {
-            _useOrCloseRWLock.readLock().unlock();
-        }
     }
 
     @Override
-    public void deleteLink(final LinkDefinition linkDefinition)
+    protected void doDeleteLink(final LinkDefinition linkDefinition)
     {
         LinkKey linkKey = new LinkKey(linkDefinition);
-        _useOrCloseRWLock.readLock().lock();
         try
         {
-            if (_storeState != StoreState.OPENED)
-            {
-                throw new StoreException("Store is not opened");
-            }
-
             Database linksDatabase = 
getEnvironmentFacade().openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
 
             final DatabaseEntry databaseEntry = new DatabaseEntry();
@@ -136,34 +110,19 @@ public class BDBLinkStore implements Lin
         {
             throw 
getEnvironmentFacade().handleDatabaseException(String.format("Failed deletion 
of link '%s'", linkKey), e);
         }
-        finally
-        {
-            _useOrCloseRWLock.readLock().unlock();
-        }
     }
 
 
     @Override
-    public void close()
+    protected void doClose()
     {
-        _useOrCloseRWLock.writeLock().lock();
-        try
-        {
-            _storeState = StoreState.CLOSED;
-        }
-        finally
-        {
-            _useOrCloseRWLock.writeLock().unlock();
-        }
     }
 
     @Override
-    public void delete()
+    protected void doDelete()
     {
-        _useOrCloseRWLock.writeLock().lock();
         try
         {
-            close();
             getEnvironmentFacade().deleteDatabase(LINKS_DB_NAME);
             getEnvironmentFacade().deleteDatabase(LINKS_VERSION_DB_NAME);
         }
@@ -172,10 +131,6 @@ public class BDBLinkStore implements Lin
             getEnvironmentFacade().handleDatabaseException("Failed deletion of 
database", e);
             LOGGER.info("Failed to delete links database", e);
         }
-        finally
-        {
-            _useOrCloseRWLock.writeLock().unlock();
-        }
     }
 
     @Override
@@ -229,6 +184,7 @@ public class BDBLinkStore implements Lin
                 {
                     save(linksDatabase, txn, link);
                 }
+                updateVersion(txn, currentVersion.toString());
                 txn.commit();
                 linksDatabase.close();
             }
@@ -242,6 +198,16 @@ public class BDBLinkStore implements Lin
         return links;
     }
 
+    private void updateVersion(final Transaction txn, final String 
currentVersion)
+    {
+        Database linksVersionDb = 
getEnvironmentFacade().openDatabase(LINKS_VERSION_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry value = new DatabaseEntry();
+        StringBinding.stringToEntry(currentVersion, key);
+        LongBinding.longToEntry(System.currentTimeMillis(), value);
+        linksVersionDb.put(txn, key, value);
+    }
+
     private void save(Database database, Transaction txn, final LinkDefinition 
link)
     {
         DatabaseEntry key = new DatabaseEntry();
@@ -297,19 +263,10 @@ public class BDBLinkStore implements Lin
         }
         catch (DatabaseNotFoundException e)
         {
+            updateVersion(null, BrokerModel.MODEL_VERSION);
             linksVersionDb = 
getEnvironmentFacade().openDatabase(LINKS_VERSION_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            StringBinding.stringToEntry(BrokerModel.MODEL_VERSION, key);
-            LongBinding.longToEntry(System.currentTimeMillis(), value);
-            linksVersionDb.put(null, key, value);
         }
 
         return linksVersionDb;
     }
-
-    enum StoreState
-    {
-        CLOSED, OPENED
-    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
 Thu Mar 23 16:52:47 2017
@@ -23,11 +23,7 @@ import com.sleepycat.bind.tuple.TupleBin
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.store.StoreException;
@@ -36,9 +32,6 @@ public class LinkValueEntryBinding exten
 {
     private static final LinkValueEntryBinding INSTANCE = new 
LinkValueEntryBinding();
 
-    private AMQPDescribedTypeRegistry _describedTypeRegistry =
-            
AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
-
     private LinkValueEntryBinding()
     {
     }
@@ -84,33 +77,16 @@ public class LinkValueEntryBinding exten
         byte[] bytes = new byte[size];
         input.read(bytes);
 
-        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
-        ValueHandler valueHandler = new ValueHandler(_describedTypeRegistry);
-        Object object;
-        try
-        {
-            object = valueHandler.parse(qpidByteBuffer);
-        }
-        catch (AmqpErrorException e)
-        {
-            throw new StoreException("Unexpected serialized data", e);
-        }
-        finally
-        {
-            qpidByteBuffer.dispose();
-        }
-        return object;
+        return LinkStoreUtils.amqpBytesToObject(bytes);
     }
 
+
+
     private void write(final Object object, final TupleOutput output)
     {
-        ValueWriter valueWriter = 
_describedTypeRegistry.getValueWriter(object);
-        int encodedSize = valueWriter.getEncodedSize();
-        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
-        valueWriter.writeToBuffer(qpidByteBuffer);
-
-        output.writeInt(encodedSize);
-        output.write(qpidByteBuffer.array());
-        qpidByteBuffer.dispose();
+        byte[] bytes = LinkStoreUtils.objectToAmqpBytes(object);
+        output.writeInt(bytes.length);
+        output.write(bytes);
     }
+
 }

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml (from 
r1788233, qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml)
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml?p2=qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml&p1=qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml&r1=1788233&r2=1788287&rev=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/pom.xml Thu Mar 23 
16:52:47 2017
@@ -1,21 +1,22 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~  Licensed to the Apache Software Foundation (ASF) under one
-  ~  or more contributor license agreements.  See the NOTICE file
-  ~  distributed with this work for additional information
-  ~  regarding copyright ownership.  The ASF licenses this file
-  ~  to you under the Apache License, Version 2.0 (the
-  ~  "License"); you may not use this file except in compliance
-  ~  with the License.  You may obtain a copy of the License at
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
   ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
   ~
-  ~  Unless required by applicable law or agreed to in writing,
-  ~  software distributed under the License is distributed on an
-  ~  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~  KIND, either express or implied.  See the License for the
-  ~  specific language governing permissions and limitations
-  ~  under the License.
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0";
@@ -31,14 +32,10 @@
     </parent>
 
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+    
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
+    <name>Apache Qpid AMQP 1.0 JDBC Link Store Plug-in</name>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-broker-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
@@ -49,7 +46,7 @@
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-bdbstore</artifactId>
+            <artifactId>qpid-broker-plugins-jdbc-store</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -59,12 +56,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>com.sleepycat</groupId>
-            <artifactId>je</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.qpid</groupId>
@@ -80,5 +71,13 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
-</project>
+    
+</project>
\ No newline at end of file

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStore.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CharStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ModelVersion;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.store.jdbc.JdbcUtils;
+
+public class JDBCLinkStore extends AbstractLinkStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCLinkStore.class);
+    private static final String LINKS_TABLE_NAME_SUFFIX = "AMQP_1_0_LINKS";
+    private static final String VERSION_TABLE_NAME_SUFFIX = 
"AMQP_1_0_LINKS_VERSION";
+    private final JDBCContainer _jdbcContainer;
+    private final String _tableNamePrefix;
+    private final String _sqlBlobType;
+    private final boolean _isUseBytesMethodsForBlob;
+
+    JDBCLinkStore(final JDBCContainer jdbcContainer)
+    {
+        _jdbcContainer = jdbcContainer;
+        _tableNamePrefix = jdbcContainer.getTableNamePrefix();
+        JDBCDetails jdbcDetails = jdbcContainer.getJDBCDetails();
+        _sqlBlobType = jdbcDetails.getBlobType();
+        _isUseBytesMethodsForBlob = jdbcDetails.isUseBytesMethodsForBlob();
+    }
+
+    @Override
+    protected Collection<LinkDefinition> doOpenAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    {
+        Collection<LinkDefinition> linkDefinitions;
+        try
+        {
+            checkTransactionIsolationLevel();
+            createOrOpenStoreDatabase();
+            linkDefinitions = getLinks();
+            ModelVersion storedVersion = getStoredVersion();
+            ModelVersion currentVersion =
+                    new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, 
BrokerModel.MODEL_MINOR_VERSION);
+            if (storedVersion.lessThan(currentVersion))
+            {
+                linkDefinitions = performUpdate(updater, linkDefinitions, 
storedVersion, currentVersion);
+            }
+            else if (currentVersion.lessThan(storedVersion))
+            {
+                throw new StoreException(String.format("Cannot downgrade the 
store from %s to %s",
+                                                       storedVersion,
+                                                       currentVersion));
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Cannot open link store", e);
+        }
+        return linkDefinitions;
+    }
+
+    @Override
+    protected void doClose() throws StoreException
+    {
+
+    }
+
+    @Override
+    protected void doSaveLink(final LinkDefinition link) throws StoreException
+    {
+        String linkKey = generateLinkKey(link);
+        Connection connection = getConnection();
+        try
+        {
+            connection.setAutoCommit(false);
+            
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(
+                    String.format(
+                            "SELECT remote_container_id, link_name, link_role, 
source, target FROM %s WHERE link_key = ?",
+                            getLinksTableName())))
+            {
+                preparedStatement.setString(1, linkKey);
+                try (ResultSet resultSet = preparedStatement.executeQuery())
+                {
+                    if (resultSet.next())
+                    {
+                        update(connection, linkKey, link);
+                    }
+                    else
+                    {
+                        insert(connection, linkKey, link);
+                    }
+                }
+            }
+            connection.commit();
+        }
+        catch (SQLException e)
+        {
+            try
+            {
+                connection.rollback();
+            }
+            catch (SQLException re)
+            {
+                LOGGER.debug("Rollback failed on rolling back saving link 
transaction", re);
+            }
+            throw new StoreException(String.format("Cannot save link %s", new 
LinkKey(link)), e);
+        }
+        finally
+        {
+            JdbcUtils.closeConnection(connection, LOGGER);
+        }
+    }
+
+    @Override
+    protected void doDeleteLink(final LinkDefinition link) throws 
StoreException
+    {
+
+        try (Connection connection = getConnection();
+             PreparedStatement preparedStatement = connection.prepareStatement(
+                     String.format("DELETE FROM %s WHERE link_key = ?", 
getLinksTableName())))
+        {
+            preparedStatement.setString(1, generateLinkKey(link));
+            preparedStatement.execute();
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException(String.format("Cannot delete link %s", 
new LinkKey(link)), e);
+        }
+    }
+
+
+    @Override
+    protected void doDelete()
+    {
+        try (Connection connection = getConnection();
+             Statement dropLinksStatement = connection.createStatement();
+             Statement dropVersionsStatement = connection.createStatement())
+        {
+            dropLinksStatement.execute(String.format("DROP TABLE %s", 
getLinksTableName()));
+            dropVersionsStatement.execute(String.format("DROP TABLE %s", 
getVersionTableName()));
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error deleting Link store", e);
+        }
+    }
+
+    @Override
+    public TerminusDurability getHighestSupportedTerminusDurability()
+    {
+        return TerminusDurability.CONFIGURATION;
+    }
+
+    private void checkTransactionIsolationLevel() throws SQLException
+    {
+        try (Connection connection = getConnection())
+        {
+            DatabaseMetaData metaData = connection.getMetaData();
+            if 
(!metaData.supportsTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE))
+            {
+                throw new StoreException(String.format(
+                        "The RDBMS '%s' does not support required transaction 
isolation level 'serializable'",
+                        metaData.getDatabaseProductName()));
+            }
+        }
+    }
+
+    private Connection getConnection()
+    {
+        return _jdbcContainer.getConnection();
+    }
+
+    private void createOrOpenStoreDatabase() throws SQLException
+    {
+        try (Connection conn = getConnection())
+        {
+            conn.setAutoCommit(true);
+
+            createLinksTable(conn);
+            createVersionTable(conn);
+        }
+    }
+
+    private void createVersionTable(final Connection conn) throws SQLException
+    {
+        String versionTableName = getVersionTableName();
+        if (!JdbcUtils.tableExists(versionTableName, conn))
+        {
+            try (Statement stmt = conn.createStatement())
+            {
+                stmt.execute(String.format("CREATE TABLE %s"
+                                           + " (version varchar(10) PRIMARY 
KEY ,"
+                                           + " version_time TIMESTAMP)", 
versionTableName));
+            }
+            updateVersion(conn, 
ModelVersion.fromString(BrokerModel.MODEL_VERSION));
+        }
+    }
+
+
+    private void createLinksTable(final Connection conn) throws SQLException
+    {
+        if (!JdbcUtils.tableExists(getLinksTableName(), conn))
+        {
+            try (Statement stmt = conn.createStatement())
+            {
+                stmt.execute(String.format("CREATE TABLE %1$s"
+                                           + " ( link_key varchar(44) PRIMARY 
KEY ,"
+                                           + " remote_container_id %2$s, "
+                                           + " link_name %2$s,"
+                                           + " link_role INTEGER,"
+                                           + " source %2$s,"
+                                           + " target %2$s )", 
getLinksTableName(), _sqlBlobType));
+            }
+        }
+    }
+
+    private String getLinksTableName()
+    {
+        return _tableNamePrefix + LINKS_TABLE_NAME_SUFFIX;
+    }
+
+    private String getVersionTableName()
+    {
+        return _tableNamePrefix + VERSION_TABLE_NAME_SUFFIX;
+    }
+
+    private Collection<LinkDefinition> performUpdate(final LinkStoreUpdater 
updater,
+                                                     
Collection<LinkDefinition> linkDefinitions,
+                                                     final ModelVersion 
storedVersion,
+                                                     final ModelVersion 
currentVersion) throws SQLException
+    {
+        linkDefinitions = updater.update(storedVersion.toString(), 
linkDefinitions);
+        Connection connection = getConnection();
+        try
+        {
+            connection.setAutoCommit(false);
+
+            try (Statement statement = connection.createStatement())
+            {
+                statement.execute("DELETE FROM " + getLinksTableName());
+            }
+
+            for (LinkDefinition linkDefinition : linkDefinitions)
+            {
+                insert(connection, generateLinkKey(linkDefinition), 
linkDefinition);
+            }
+            updateVersion(connection, currentVersion);
+            connection.commit();
+        }
+        catch (SQLException e)
+        {
+            try
+            {
+                connection.rollback();
+            }
+            catch (SQLException re)
+            {
+                LOGGER.debug("Cannot rollback transaction", re);
+            }
+            throw e;
+        }
+        finally
+        {
+            JdbcUtils.closeConnection(connection, LOGGER);
+        }
+        return linkDefinitions;
+    }
+
+    private Collection<LinkDefinition> getLinks() throws SQLException
+    {
+        Collection<LinkDefinition> links = new ArrayList<>();
+        try (Connection connection = getConnection();
+             Statement statement = connection.createStatement();
+             ResultSet resultSet = statement.executeQuery(String.format(
+                     "SELECT remote_container_id, link_name, link_role, 
source, target FROM %s",
+                     getLinksTableName())))
+        {
+            while (resultSet.next())
+            {
+                String remoteContainerId = getBlobValueAsString(resultSet, 1);
+                String linkName = getBlobValueAsString(resultSet, 2);
+                Role role = Role.valueOf(resultSet.getBoolean(3));
+                Source source = (Source) getBlobAsAmqpObject(resultSet, 4);
+                Target target = (Target) getBlobAsAmqpObject(resultSet, 5);
+
+                links.add(new LinkDefinitionImpl(remoteContainerId, linkName, 
role, source, target));
+            }
+        }
+        return links;
+    }
+
+    private Object getBlobAsAmqpObject(final ResultSet resultSet, final int 
index) throws SQLException
+    {
+        byte[] sourceBytes;
+        if (_isUseBytesMethodsForBlob)
+        {
+            sourceBytes = resultSet.getBytes(index);
+        }
+        else
+        {
+            Blob blob = resultSet.getBlob(index);
+            try (InputStream is = blob.getBinaryStream())
+            {
+                sourceBytes = ByteStreams.toByteArray(is);
+            }
+            catch (IOException e)
+            {
+                throw new StoreException("Cannot convert blob to string", e);
+            }
+            finally
+            {
+                blob.free();
+            }
+        }
+        return LinkStoreUtils.amqpBytesToObject(sourceBytes);
+    }
+
+    private String getBlobValueAsString(final ResultSet resultSet, final int 
index) throws SQLException
+    {
+        if (_isUseBytesMethodsForBlob)
+        {
+            return new String(resultSet.getBytes(index), UTF_8);
+        }
+
+        Blob blob = resultSet.getBlob(index);
+        try (InputStream is = blob.getBinaryStream();
+             InputStreamReader isr = new InputStreamReader(is, UTF_8))
+        {
+            return CharStreams.toString(isr);
+        }
+        catch (IOException e)
+        {
+            throw new StoreException("Cannot convert blob to string", e);
+        }
+        finally
+        {
+            blob.free();
+        }
+    }
+
+    private ModelVersion getStoredVersion() throws SQLException
+    {
+        ModelVersion version = null;
+        try (Connection connection = getConnection();
+             Statement statement = connection.createStatement();
+             ResultSet resultSet = 
statement.executeQuery(String.format("SELECT version FROM %s",
+                                                                        
getVersionTableName())))
+        {
+            while (resultSet.next())
+            {
+                ModelVersion storedVersion = 
ModelVersion.fromString(resultSet.getString(1));
+                if (version == null || version.lessThan(storedVersion))
+                {
+                    version = storedVersion;
+                }
+            }
+        }
+        if (version == null)
+        {
+            throw new StoreException("Version of links is not found");
+        }
+        return version;
+    }
+
+    private void updateVersion(final Connection connection, final ModelVersion 
currentVersion) throws SQLException
+    {
+        String version = currentVersion.toString();
+        try (PreparedStatement statement = 
connection.prepareStatement(String.format(
+                "INSERT INTO %s (version, version_time) VALUES (?,?)",
+                getVersionTableName())))
+        {
+            statement.setString(1, version);
+            statement.setDate(2, new 
java.sql.Date(System.currentTimeMillis()));
+            if (statement.executeUpdate() != 1)
+            {
+                throw new StoreException(String.format("Cannot insert version 
'%s' into version table", version));
+            }
+        }
+    }
+
+
+    private void insert(final Connection connection, final String linkKey, 
final LinkDefinition linkDefinition)
+            throws SQLException
+    {
+        try (PreparedStatement statement = 
connection.prepareStatement(String.format(
+                "INSERT INTO %s (link_key, remote_container_id, link_name, 
link_role, source, target) VALUES (?,?,?,?,?,?)",
+                getLinksTableName())))
+        {
+            statement.setString(1, linkKey);
+            saveStringAsBlob(statement, 2, 
linkDefinition.getRemoteContainerId());
+            saveStringAsBlob(statement, 3, linkDefinition.getName());
+            statement.setInt(4, linkDefinition.getRole().getValue() ? 1 : 0);
+            saveObjectAsBlob(statement, 5, linkDefinition.getSource());
+            saveObjectAsBlob(statement, 6, linkDefinition.getTarget());
+            if (statement.executeUpdate() != 1)
+            {
+                throw new StoreException(String.format("Cannot save link %s", 
new LinkKey(linkDefinition)));
+            }
+        }
+    }
+
+    private void update(final Connection connection, final String linkKey, 
final LinkDefinition linkDefinition)
+            throws SQLException
+    {
+        try (PreparedStatement statement = 
connection.prepareStatement(String.format(
+                "UPDATE %s SET source = ?, target = ? WHERE link_key = ?",
+                getLinksTableName())))
+        {
+            saveObjectAsBlob(statement, 1, linkDefinition.getSource());
+            saveObjectAsBlob(statement, 2, linkDefinition.getTarget());
+            statement.setString(3, linkKey);
+            if (statement.executeUpdate() != 1)
+            {
+                throw new StoreException(String.format("Cannot save link %s", 
new LinkKey(linkDefinition)));
+            }
+        }
+    }
+
+    private void saveObjectAsBlob(final PreparedStatement statement, final int 
index, final Object object)
+            throws SQLException
+    {
+        saveBytesAsBlob(statement, index, 
LinkStoreUtils.objectToAmqpBytes(object));
+    }
+
+    private void saveBytesAsBlob(final PreparedStatement statement, final int 
index, final byte[] bytes)
+            throws SQLException
+    {
+        if (_isUseBytesMethodsForBlob)
+        {
+            statement.setBytes(index, bytes);
+        }
+        else
+        {
+            try (InputStream inputStream = new ByteArrayInputStream(bytes))
+            {
+                statement.setBlob(index, inputStream);
+            }
+            catch (IOException e)
+            {
+                throw new StoreException("Cannot save link", e);
+            }
+        }
+    }
+
+    private void saveStringAsBlob(final PreparedStatement statement, final int 
index, final String value)
+            throws SQLException
+    {
+        saveBytesAsBlob(statement, index, value.getBytes(UTF_8));
+    }
+
+    private String generateLinkKey(final LinkDefinition linkDefinition)
+    {
+        MessageDigest md;
+        try
+        {
+            md = MessageDigest.getInstance("SHA-256");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new StoreException("Cannot generate SHA-256 checksum", e);
+        }
+
+        md.update(linkDefinition.getRemoteContainerId().getBytes(UTF_8));
+        md.update(linkDefinition.getName().getBytes(UTF_8));
+        md.update(linkDefinition.getRole().getValue() ? (byte) 1 : (byte) 0);
+
+        return Base64.getEncoder().encodeToString(md.digest());
+    }
+}

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreFactory.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class JDBCLinkStoreFactory implements LinkStoreFactory
+{
+    public static final String TYPE = "JDBC";
+
+    @Override
+    public String getType()
+    {
+        return TYPE;
+    }
+
+    @Override
+    public LinkStore create(final NamedAddressSpace addressSpace)
+    {
+        JDBCContainer jdbcContainer = null;
+        if (addressSpace instanceof JDBCContainer)
+        {
+            jdbcContainer = (JDBCContainer) addressSpace;
+        }
+        else if (addressSpace instanceof VirtualHost
+                 && ((VirtualHost) addressSpace).getParent() instanceof 
JDBCContainer)
+        {
+            jdbcContainer = (JDBCContainer) ((VirtualHost) 
addressSpace).getParent();
+        }
+        else
+        {
+            throw new StoreException(String.format("Named address space '%s' 
is not support by link store of type '%s'",
+                                                   addressSpace.getName(),
+                                                   TYPE));
+        }
+
+        return new JDBCLinkStore(jdbcContainer);
+    }
+
+    @Override
+    public boolean supports(final NamedAddressSpace addressSpace)
+    {
+        return (addressSpace instanceof JDBCContainer
+                || (addressSpace instanceof VirtualHost
+                    && ((VirtualHost) addressSpace).getParent() instanceof 
JDBCContainer));
+    }
+
+    @Override
+    public int getPriority()
+    {
+        return 100;
+    }
+}

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-jdbc-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/jdbc/JDBCLinkStoreTest.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.jdbc;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreTestCase;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+
+public class JDBCLinkStoreTest extends LinkStoreTestCase
+{
+    @Override
+    protected LinkStore createLinkStore()
+    {
+        final JDBCDetails details = mock(JDBCDetails.class);
+        when(details.getBlobType()).thenReturn("blob");
+        when(details.isUseBytesMethodsForBlob()).thenReturn(false);
+
+        JDBCContainer jdbcContainer = mock(JDBCContainer.class);
+        when(jdbcContainer.getJDBCDetails()).thenReturn(details);
+        when(jdbcContainer.getTableNamePrefix()).thenReturn("testTablePrefix");
+        when(jdbcContainer.getConnection()).thenAnswer(invocation -> 
DriverManager.getConnection(getUrl() + ";create=true"));
+
+        return new JDBCLinkStore(jdbcContainer);
+    }
+
+    @Override
+    protected void deleteLinkStore()
+    {
+        Connection connection = null;
+        try
+        {
+            connection = DriverManager.getConnection(getUrl());
+        }
+        catch (SQLException e)
+        {
+            if (e.getSQLState().equalsIgnoreCase("08006"))
+            {
+                //expected and represents a clean shutdown of this database 
only, do nothing.
+            }
+            else
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        finally
+        {
+            if (connection != null)
+            {
+                try
+                {
+                    connection.close();
+                }
+                catch (SQLException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private String getUrl()
+    {
+        return String.format("jdbc:derby:memory:/%s", getTestName());
+    }
+}

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/AbstractLinkStore.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store;
+
+import java.util.Collection;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.store.StoreException;
+
+public abstract class AbstractLinkStore implements LinkStore
+{
+    private final ReentrantReadWriteLock _useOrCloseRWLock = new 
ReentrantReadWriteLock(true);
+    private volatile StoreState _storeState = StoreState.CLOSED;
+
+    protected abstract Collection<LinkDefinition> doOpenAndLoad(final 
LinkStoreUpdater updater);
+    protected abstract void doClose();
+    protected abstract void doDelete();
+    protected abstract void doSaveLink(final LinkDefinition link);
+    protected abstract void doDeleteLink(final LinkDefinition link);
+
+    @Override
+    public final Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater 
updater) throws StoreException
+    {
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            if (_storeState != StoreState.CLOSED)
+            {
+                throw new StoreException("Store is already opened");
+            }
+
+            Collection<LinkDefinition> linkDefinitions = 
doOpenAndLoad(updater);
+            _storeState = StoreState.OPENED;
+            return linkDefinitions;
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public final void close() throws StoreException
+    {
+        _useOrCloseRWLock.writeLock().lock();
+        try
+        {
+            doClose();
+            _storeState = StoreState.CLOSED;
+        }
+        finally
+        {
+            _useOrCloseRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public final void saveLink(final LinkDefinition link) throws StoreException
+    {
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            if (_storeState != StoreState.OPENED)
+            {
+                throw new StoreException("Store is not opened");
+            }
+
+            doSaveLink(link);
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public final void deleteLink(final LinkDefinition link) throws 
StoreException
+    {
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            if (_storeState != StoreState.OPENED)
+            {
+                throw new StoreException("Store is not opened");
+            }
+
+            doDeleteLink(link);
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public final void delete()
+    {
+        _useOrCloseRWLock.writeLock().lock();
+        try
+        {
+            close();
+            doDelete();
+        }
+        finally
+        {
+            _useOrCloseRWLock.writeLock().unlock();
+        }
+    }
+
+    enum StoreState
+    {
+        CLOSED, OPENED
+    }
+}

Added: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUtils.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.store.StoreException;
+
+public class LinkStoreUtils
+{
+    private static AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY =
+            
AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
+
+    public static Object amqpBytesToObject(final byte[] bytes)
+    {
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
+        ValueHandler valueHandler = new ValueHandler(DESCRIBED_TYPE_REGISTRY);
+        Object object;
+        try
+        {
+            object = valueHandler.parse(qpidByteBuffer);
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new StoreException("Unexpected serialized data", e);
+        }
+        finally
+        {
+            qpidByteBuffer.dispose();
+        }
+        return object;
+    }
+
+    public static byte[] objectToAmqpBytes(final Object object)
+    {
+        ValueWriter valueWriter = 
DESCRIBED_TYPE_REGISTRY.getValueWriter(object);
+        int encodedSize = valueWriter.getEncodedSize();
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
+        valueWriter.writeToBuffer(qpidByteBuffer);
+
+        byte[] bytes = qpidByteBuffer.array();
+        qpidByteBuffer.dispose();
+        return bytes;
+    }
+}

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
 Thu Mar 23 16:52:47 2017
@@ -70,7 +70,8 @@ public abstract class LinkStoreTestCase
         _source.setTimeout(new UnsignedInteger(1));
 
         _target.setTimeout(new UnsignedInteger(2));
-        
_target.setDynamicNodeProperties(Collections.singletonMap("targetDynamicProperty",
 "targetDynamicPropertyValue"));
+        
_target.setDynamicNodeProperties(Collections.singletonMap("targetDynamicProperty",
+                                                                  
"targetDynamicPropertyValue"));
         _target.setDynamic(Boolean.TRUE);
         _target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         _target.setAddress("bar");
@@ -87,9 +88,19 @@ public abstract class LinkStoreTestCase
 
     public void testOpenAndLoad() throws Exception
     {
-        Collection<LinkDefinition>  links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
+        Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
         assertTrue("Unexpected links", links.isEmpty());
 
+        try
+        {
+            _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+            fail("Repeated open of already opened store should fail");
+        }
+        catch (StoreException e)
+        {
+            // pass
+        }
+
         LinkDefinition linkDefinition = createLinkDefinition("1", "test");
         _linkStore.saveLink(linkDefinition);
         _linkStore.close();
@@ -99,7 +110,6 @@ public abstract class LinkStoreTestCase
     }
 
 
-
     public void testClose() throws Exception
     {
         _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
@@ -111,7 +121,7 @@ public abstract class LinkStoreTestCase
             _linkStore.saveLink(linkDefinition);
             fail("Saving link with close store should fail");
         }
-        catch(StoreException e)
+        catch (StoreException e)
         {
             // pass
         }
@@ -119,11 +129,22 @@ public abstract class LinkStoreTestCase
 
     public void testSaveLink() throws Exception
     {
-        _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+
         LinkDefinition linkDefinition = createLinkDefinition("1", "test");
+        _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
         _linkStore.saveLink(linkDefinition);
         _linkStore.close();
 
+        try
+        {
+            _linkStore.saveLink(createLinkDefinition("2", "test2"));
+            fail("Save on unopened database should fail");
+        }
+        catch (StoreException e)
+        {
+            // pass
+        }
+
         Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
         assertEquals("Unexpected link number", 1, links.size());
 
@@ -149,6 +170,16 @@ public abstract class LinkStoreTestCase
         _linkStore.deleteLink(linkDefinition2);
         _linkStore.close();
 
+        try
+        {
+            _linkStore.deleteLink(linkDefinition);
+            fail("Delete on unopened database should fail");
+        }
+        catch (StoreException e)
+        {
+            // pass
+        }
+
         Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
         assertEquals("Unexpected link number", 1, links.size());
 
@@ -171,9 +202,13 @@ public abstract class LinkStoreTestCase
         LinkDefinition linkDefinition2 = createLinkDefinition("2", "test2");
         _linkStore.saveLink(linkDefinition2);
 
+        _linkStore.close();
+        Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
+        assertEquals("Unexpected link number", 2, links.size());
+
         _linkStore.delete();
 
-        Collection<LinkDefinition> links = _linkStore.openAndLoad(new 
LinkStoreUpdaterImpl());
+        links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
         assertEquals("Unexpected link number", 0, links.size());
     }
 

Modified: 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
 Thu Mar 23 16:52:47 2017
@@ -90,7 +90,7 @@ public class DerbyConfigurationStore ext
     }
 
     @Override
-    protected Connection getConnection() throws SQLException
+    public Connection getConnection() throws SQLException
     {
         return DriverManager.getConnection(_connectionURL);
     }
@@ -206,7 +206,7 @@ public class DerbyConfigurationStore ext
         }
 
         @Override
-        protected Connection getConnection() throws SQLException
+        public Connection getConnection() throws SQLException
         {
             checkMessageStoreOpen();
             return DerbyConfigurationStore.this.getConnection();

Modified: 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
 Thu Mar 23 16:52:47 2017
@@ -52,7 +52,7 @@ public class DerbyMessageStore extends A
     }
 
     @Override
-    protected Connection getConnection() throws SQLException
+    public Connection getConnection() throws SQLException
     {
         checkMessageStoreOpen();
         return DriverManager.getConnection(_connectionURL);

Modified: 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHostImpl.java
 Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.virtualhost.derby;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -29,10 +31,14 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.derby.DerbyMessageStore;
 import org.apache.qpid.server.store.derby.DerbyUtils;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.FileHelper;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 
-public class DerbyVirtualHostImpl extends 
AbstractVirtualHost<DerbyVirtualHostImpl> implements 
DerbyVirtualHost<DerbyVirtualHostImpl>
+public class DerbyVirtualHostImpl extends 
AbstractVirtualHost<DerbyVirtualHostImpl>
+        implements DerbyVirtualHost<DerbyVirtualHostImpl>, JDBCContainer
 {
     public static final String VIRTUAL_HOST_TYPE = "DERBY";
 
@@ -90,4 +96,30 @@ public class DerbyVirtualHostImpl extend
             throw new IllegalConfigurationException("The store path is not 
writable directory");
         }
     }
+
+    @Override
+    public JDBCDetails getJDBCDetails()
+    {
+        return JDBCDetails.getJdbcDetails("derby", this);
+    }
+
+    @Override
+    public Connection getConnection()
+    {
+        try
+        {
+            return ((DerbyMessageStore) getMessageStore()).getConnection();
+        }
+        catch (SQLException e)
+        {
+            throw new ConnectionScopedRuntimeException(String.format(
+                    "Error opening connection to database for VirtualHost 
'%s'", getName()));
+        }
+    }
+
+    @Override
+    public String getTableNamePrefix()
+    {
+        return "";
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNodeImpl.java
 Thu Mar 23 16:52:47 2017
@@ -21,6 +21,8 @@
 
 package org.apache.qpid.server.virtualhostnode.derby;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -35,14 +37,18 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.derby.DerbyConfigurationStore;
 import org.apache.qpid.server.store.derby.DerbyUtils;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
 import org.apache.qpid.server.store.preferences.PreferenceStore;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.FileHelper;
 import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
 
 @ManagedObject( category = false,
                 type = DerbyVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE,
                 validChildTypes = 
"org.apache.qpid.server.virtualhostnode.derby.DerbyVirtualHostNodeImpl#getSupportedChildTypes()"
 )
-public class DerbyVirtualHostNodeImpl extends 
AbstractStandardVirtualHostNode<DerbyVirtualHostNodeImpl> implements 
DerbyVirtualHostNode<DerbyVirtualHostNodeImpl>
+public class DerbyVirtualHostNodeImpl extends 
AbstractStandardVirtualHostNode<DerbyVirtualHostNodeImpl>
+        implements DerbyVirtualHostNode<DerbyVirtualHostNodeImpl>, 
JDBCContainer
 {
     public static final String VIRTUAL_HOST_NODE_TYPE = "DERBY";
 
@@ -104,4 +110,31 @@ public class DerbyVirtualHostNodeImpl ex
     {
         return 
((DerbyConfigurationStore)getConfigurationStore()).getPreferenceStore();
     }
+
+    @Override
+    public JDBCDetails getJDBCDetails()
+    {
+        return JDBCDetails.getJdbcDetails("derby", this);
+    }
+
+    @Override
+    public Connection getConnection()
+    {
+        try
+        {
+            return ((DerbyConfigurationStore) 
getConfigurationStore()).getConnection();
+        }
+        catch (SQLException e)
+        {
+            throw new ConnectionScopedRuntimeException(String.format(
+                    "Error opening connection to database for VirtualHostNode 
'%s'",
+                    getName()));
+        }
+    }
+
+    @Override
+    public String getTableNamePrefix()
+    {
+        return "";
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
 Thu Mar 23 16:52:47 2017
@@ -533,7 +533,7 @@ public abstract class AbstractJDBCMessag
         return connection;
     }
 
-    protected abstract Connection getConnection() throws SQLException;
+    public abstract Connection getConnection() throws SQLException;
 
     @Override
     public Transaction newTransaction()

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
 Thu Mar 23 16:52:47 2017
@@ -145,7 +145,7 @@ public class GenericJDBCConfigurationSto
     }
 
     @Override
-    protected Connection getConnection() throws SQLException
+    public Connection getConnection() throws SQLException
     {
         return _connectionProvider.getConnection();
     }
@@ -250,7 +250,7 @@ public class GenericJDBCConfigurationSto
         }
 
         @Override
-        protected Connection getConnection() throws SQLException
+        public Connection getConnection() throws SQLException
         {
             return GenericJDBCConfigurationStore.this.getConnection();
         }

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
 Thu Mar 23 16:52:47 2017
@@ -118,7 +118,7 @@ public class GenericJDBCMessageStore ext
     }
 
     @Override
-    protected Connection getConnection() throws SQLException
+    public Connection getConnection() throws SQLException
     {
         return _connectionProvider.getConnection();
     }

Added: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java?rev=1788287&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCContainer.java
 Thu Mar 23 16:52:47 2017
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.jdbc;
+
+import java.sql.Connection;
+
+public interface JDBCContainer
+{
+    JDBCDetails getJDBCDetails();
+
+    Connection getConnection();
+
+    String getTableNamePrefix();
+}

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
 Thu Mar 23 16:52:47 2017
@@ -231,6 +231,19 @@ public abstract class JDBCDetails
 
     public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final 
ConfiguredObject<?> object)
     {
+        String[] components = jdbcUrl.split(":", 3);
+        final JDBCDetails details;
+        String vendor = null;
+        if(components.length >= 2)
+        {
+            vendor = components[1];
+        }
+
+        return getJdbcDetails(vendor, object);
+    }
+
+    public static JDBCDetails getJdbcDetails(final String vendor, final 
ConfiguredObject<?> object)
+    {
         final Set<String> contextKeys = object.getContextKeys(false);
         Map<String,String> mapConversion = new AbstractMap<String, String>()
         {
@@ -296,15 +309,14 @@ public abstract class JDBCDetails
                 };
             }
         };
-        return getDetailsForJdbcUrl(jdbcUrl, mapConversion);
+        return getJdbcDetails(vendor, mapConversion);
     }
-    public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final 
Map<String, String> contextMap)
+
+    static JDBCDetails getJdbcDetails(final String vendor, final Map<String, 
String> contextMap)
     {
-        String[] components = jdbcUrl.split(":", 3);
         final JDBCDetails details;
-        if(components.length >= 2)
+        if (vendor != null)
         {
-            String vendor = components[1];
             if (KnownJDBCDetails.VENDOR_DETAILS.containsKey(vendor))
             {
                 details = KnownJDBCDetails.VENDOR_DETAILS.get(vendor);
@@ -319,7 +331,6 @@ public abstract class JDBCDetails
             details = KnownJDBCDetails.FALLBACK;
         }
 
-
         return new JDBCDetails()
         {
             @Override
@@ -371,7 +382,5 @@ public abstract class JDBCDetails
                         || contextMap.containsKey(CONTEXT_JDBCSTORE_BLOBTYPE);
             }
         };
-
     }
-
 }

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHostImpl.java
 Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.virtualhost.jdbc;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.Map;
 
 import org.apache.qpid.server.model.ManagedAttributeField;
@@ -27,11 +29,16 @@ import org.apache.qpid.server.model.Mana
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.jdbc.AbstractJDBCMessageStore;
 import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 
 @ManagedObject(category = false, type = JDBCVirtualHostImpl.VIRTUAL_HOST_TYPE)
-public class JDBCVirtualHostImpl extends 
AbstractVirtualHost<JDBCVirtualHostImpl> implements 
JDBCVirtualHost<JDBCVirtualHostImpl>
+public class JDBCVirtualHostImpl extends 
AbstractVirtualHost<JDBCVirtualHostImpl>
+        implements JDBCVirtualHost<JDBCVirtualHostImpl>, JDBCContainer
 {
     public static final String VIRTUAL_HOST_TYPE = "JDBC";
 
@@ -94,6 +101,27 @@ public class JDBCVirtualHostImpl extends
     }
 
     @Override
+    public JDBCDetails getJDBCDetails()
+    {
+        return JDBCDetails.getDetailsForJdbcUrl(getConnectionUrl(), this);
+    }
+
+    @Override
+    public Connection getConnection()
+    {
+        try
+        {
+            return ((AbstractJDBCMessageStore) 
getMessageStore()).getConnection();
+        }
+        catch (SQLException e)
+        {
+            throw new ConnectionScopedRuntimeException(String.format(
+                    "Error opening connection to database for VirtualHost 
'%s'",
+                    getName()));
+        }
+    }
+
+    @Override
     public String toString()
     {
         return getClass().getSimpleName() + " [id=" + getId() + ", name=" + 
getName() +

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
 Thu Mar 23 16:52:47 2017
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.virtualhostnode.jdbc;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -31,12 +33,16 @@ import org.apache.qpid.server.model.Mana
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
+import org.apache.qpid.server.store.jdbc.JDBCContainer;
+import org.apache.qpid.server.store.jdbc.JDBCDetails;
 import org.apache.qpid.server.store.preferences.PreferenceStore;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
 
 @ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category 
= false ,
                validChildTypes = 
"org.apache.qpid.server.virtualhostnode.jdbc.JDBCVirtualHostNodeImpl#getSupportedChildTypes()")
-public class JDBCVirtualHostNodeImpl extends 
AbstractStandardVirtualHostNode<JDBCVirtualHostNodeImpl> implements 
JDBCVirtualHostNode<JDBCVirtualHostNodeImpl>
+public class JDBCVirtualHostNodeImpl extends 
AbstractStandardVirtualHostNode<JDBCVirtualHostNodeImpl>
+        implements JDBCVirtualHostNode<JDBCVirtualHostNodeImpl>, JDBCContainer
 {
     public static final String VIRTUAL_HOST_NODE_TYPE = "JDBC";
 
@@ -103,6 +109,27 @@ public class JDBCVirtualHostNodeImpl ext
     }
 
     @Override
+    public JDBCDetails getJDBCDetails()
+    {
+        return JDBCDetails.getDetailsForJdbcUrl(getConnectionUrl(), this);
+    }
+
+    @Override
+    public Connection getConnection()
+    {
+        try
+        {
+            return ((GenericJDBCConfigurationStore) 
getConfigurationStore()).getConnection();
+        }
+        catch (SQLException e)
+        {
+            throw new ConnectionScopedRuntimeException(String.format(
+                    "Error opening connection to database for VirtualHostNode 
'%s'",
+                    getName()));
+        }
+    }
+
+    @Override
     public String toString()
     {
         return getClass().getSimpleName() + " [id=" + getId() + ", name=" + 
getName() +

Modified: 
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCDetailsTest.java
 Thu Mar 23 16:52:47 2017
@@ -29,8 +29,7 @@ public class JDBCDetailsTest extends Qpi
 {
     public void testDerby()
     {
-        JDBCDetails derbyDetails = 
JDBCDetails.getDetailsForJdbcUrl("jdbc:derby:sample",
-                                                               
Collections.<String, String>emptyMap());
+        JDBCDetails derbyDetails = JDBCDetails.getJdbcDetails("derby", 
Collections.emptyMap());
         assertEquals("derby", derbyDetails.getVendor());
         assertEquals("varchar(%d) for bit data", 
derbyDetails.getVarBinaryType());
         assertEquals("bigint", derbyDetails.getBigintType());
@@ -43,7 +42,7 @@ public class JDBCDetailsTest extends Qpi
 
     public void testUnknownVendor_UsesFallbackDetails()
     {
-        JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl("jdbc:homedb:", 
Collections.<String, String>emptyMap());
+        JDBCDetails details = JDBCDetails.getJdbcDetails("homedb", 
Collections.emptyMap());
         assertEquals("fallback", details.getVendor());
         assertEquals("varchar(%d) for bit data", details.getVarBinaryType());
         assertEquals("bigint", details.getBigintType());
@@ -59,7 +58,7 @@ public class JDBCDetailsTest extends Qpi
         Map<String, String> contextMap = new HashMap<>();
         contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_VARBINARYTYPE, 
"myvarbin");
 
-        JDBCDetails derbyDetails = 
JDBCDetails.getDetailsForJdbcUrl("jdbc:derby:sample", contextMap);
+        JDBCDetails derbyDetails = JDBCDetails.getJdbcDetails("derby", 
contextMap);
         assertEquals("derby", derbyDetails.getVendor());
         assertEquals("myvarbin", derbyDetails.getVarBinaryType());
         assertEquals("bigint", derbyDetails.getBigintType());
@@ -81,7 +80,7 @@ public class JDBCDetailsTest extends Qpi
         contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_BLOBTYPE, "myblob");
         contextMap.put(JDBCDetails.CONTEXT_JDBCSTORE_USEBYTESFORBLOB, "true");
 
-        JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl("jdbc:sybase:", 
contextMap);
+        JDBCDetails details = JDBCDetails.getJdbcDetails("sybase", contextMap);
         assertEquals("sybase", details.getVendor());
         assertEquals("myvarbin", details.getVarBinaryType());
         assertEquals("mybigint", details.getBigintType());

Modified: qpid/java/trunk/broker/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/broker/pom.xml (original)
+++ qpid/java/trunk/broker/pom.xml Thu Mar 23 16:52:47 2017
@@ -127,6 +127,13 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
+      
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
+      <version>${project.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-management-http</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
@@ -164,7 +171,7 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+      
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
       <optional>true</optional>

Modified: qpid/java/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Thu Mar 23 16:52:47 2017
@@ -178,6 +178,7 @@
     <module>broker-plugins/memory-store</module>
     <module>broker-plugins/websocket</module>
     <module>broker-plugins/amqp-1-0-bdb-store</module>
+    <module>broker-plugins/amqp-1-0-jdbc-store</module>
     <module>tools</module>
 
     <module>qpid-systests-parent</module>

Modified: qpid/java/trunk/systests/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/systests/pom.xml (original)
+++ qpid/java/trunk/systests/pom.xml Thu Mar 23 16:52:47 2017
@@ -137,11 +137,18 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+      
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-jdbc-link-store</artifactId>
       <version>${project.version}</version>
     </dependency>
 
     <dependency>
+      <groupId>org.apache.qpid</groupId>
+      
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+      <version>${project.version}</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-bdbstore</artifactId>
       <version>${project.version}</version>

Modified: qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml?rev=1788287&r1=1788286&r2=1788287&view=diff
==============================================================================
--- qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml (original)
+++ qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml Thu Mar 23 16:52:47 
2017
@@ -75,8 +75,9 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+      
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
       <version>${project.version}</version>
+      <optional>true</optional>
     </dependency>
 
   </dependencies>



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to