Author: lquack
Date: Thu Jul 14 16:01:22 2016
New Revision: 1752682
URL: http://svn.apache.org/viewvc?rev=1752682&view=rev
Log:
QPID-7337: [Java Broker] Implement Preference Store JDBC/Derby backend
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/preferences/AbstractJDBCPreferenceStore.java
qpid/java/trunk/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyPreferenceStoreTest.java
Modified:
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/preferences/PreferenceTestHelper.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbySystemConfig.java
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCSystemConfig.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNode.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
Modified:
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java
(original)
+++
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBPreferenceStoreTest.java
Thu Jul 14 16:01:22 2016
@@ -29,7 +29,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.UUID;
@@ -44,6 +43,7 @@ import com.sleepycat.je.EnvironmentConfi
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.preferences.PreferenceTestHelper;
import org.apache.qpid.server.store.berkeleydb.tuple.MapBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.preferences.PreferenceRecord;
@@ -103,7 +103,7 @@ public class BDBPreferenceStoreTest exte
AbstractBDBPreferenceStore.StoreState.OPENED,
_preferenceStore.getStoreState());
assertNotNull("Store was not properly opened",
_preferenceStore.getEnvironmentFacade());
- assertRecords(_testInitialRecords, recovered);
+ PreferenceTestHelper.assertRecords(_testInitialRecords, recovered);
}
public void testClose() throws Exception
@@ -131,7 +131,7 @@ public class BDBPreferenceStoreTest exte
Collection<PreferenceRecord> recovered =
_preferenceStore.openAndLoad(_updater);
List<PreferenceRecord> expected = new ArrayList<>(records);
expected.add(_testInitialRecords.get(1));
- assertRecords(expected, recovered);
+ PreferenceTestHelper.assertRecords(expected, recovered);
}
public void testReplace() throws Exception
@@ -149,7 +149,7 @@ public class BDBPreferenceStoreTest exte
_preferenceStore.close();
Collection<PreferenceRecord> recovered =
_preferenceStore.openAndLoad(_updater);
- assertRecords(recordsToAddUpdate, recovered);
+ PreferenceTestHelper.assertRecords(recordsToAddUpdate, recovered);
}
public void testUpdateFailIfNotOpened() throws Exception
@@ -178,31 +178,6 @@ public class BDBPreferenceStoreTest exte
}
}
- private void assertRecords(final Collection<PreferenceRecord> expected,
final Collection<PreferenceRecord> actual)
- {
- assertEquals("Unexpected number of records", expected.size(),
actual.size());
-
- for (PreferenceRecord expectedRecord : expected)
- {
- PreferenceRecord actualRecord = null;
- for (PreferenceRecord record : actual)
- {
- if (record.getId().equals(expectedRecord.getId()))
- {
- actualRecord = record;
- break;
- }
- }
- assertNotNull(String.format("No actual record found for expected
record '%s'", expectedRecord.getId()),
- actualRecord);
- assertEquals(String.format("Expected attributes are different from
actual: %s vs %s",
-
expectedRecord.getAttributes().toString(),
-
actualRecord.getAttributes().toString()),
- new HashMap<>(expectedRecord.getAttributes()),
- new HashMap<>(actualRecord.getAttributes()));
- }
- }
-
private void populateTestData(final List<PreferenceRecord> records)
{
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/preferences/AbstractJDBCPreferenceStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/preferences/AbstractJDBCPreferenceStore.java?rev=1752682&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/preferences/AbstractJDBCPreferenceStore.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/preferences/AbstractJDBCPreferenceStore.java
Thu Jul 14 16:01:22 2016
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.preferences;
+
+import static org.apache.qpid.server.store.JdbcUtils.tableExists;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ModelVersion;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.BaseAction;
+
+public abstract class AbstractJDBCPreferenceStore implements PreferenceStore
+{
+ private static final String PREFERENCES_VERSION_TABLE_NAME =
"PREFERENCES_VERSION";
+ private static final String PREFERENCES_TABLE_NAME = "PREFERENCES";
+
+ private static final String CREATE_PREFERENCES_VERSION_TABLE =
+ "CREATE TABLE " + PREFERENCES_VERSION_TABLE_NAME + " ( version
VARCHAR(20) NOT NULL )";
+ private static final String INSERT_INTO_PREFERENCES_VERSION =
+ "INSERT INTO " + PREFERENCES_VERSION_TABLE_NAME + " ( version )
VALUES ( ? )";
+ private static final String SELECT_FROM_PREFERENCES_VERSION =
+ "SELECT version FROM " + PREFERENCES_VERSION_TABLE_NAME;
+
+ private static final String INSERT_INTO_PREFERENCES = "INSERT INTO " +
PREFERENCES_TABLE_NAME + " ( id, attributes ) VALUES ( ?, ? )";
+ private static final String DELETE_FROM_PREFERENCES = "DELETE FROM " +
PREFERENCES_TABLE_NAME + " where id = ?";
+ private static final String SELECT_FROM_PREFERENCES = "SELECT id,
attributes FROM " + PREFERENCES_TABLE_NAME;
+ private static final String FIND_PREFERENCE = "SELECT attributes FROM " +
PREFERENCES_TABLE_NAME + " WHERE id = ?";
+ private static final String UPDATE_PREFERENCES = "UPDATE " +
PREFERENCES_TABLE_NAME + " SET attributes = ? WHERE id = ?";
+
+ private final AtomicReference<StoreState> _storeState = new
AtomicReference<>(StoreState.CLOSED);
+
+ @Override
+ public Collection<PreferenceRecord> openAndLoad(final
PreferenceStoreUpdater updater) throws StoreException
+ {
+ if (!_storeState.compareAndSet(StoreState.CLOSED, StoreState.OPENING))
+ {
+ throw new IllegalStateException(String.format("PreferenceStore
cannot be opened when in state '%s'",
+ _storeState.get()));
+ }
+
+ try
+ {
+ Collection<PreferenceRecord> records;
+
+ try (Connection connection = getConnection())
+ {
+ createVersionTable(connection);
+ createPreferencesTable(connection);
+ ModelVersion preferencesVersion =
getPreferencesVersion(connection);
+ ModelVersion brokerModelVersion =
ModelVersion.fromString(BrokerModel.MODEL_VERSION);
+ if (brokerModelVersion.lessThan(preferencesVersion))
+ {
+ throw new StoreException(String.format("Cannot downgrade
preference store from '%s' to '%s'", preferencesVersion, brokerModelVersion));
+ }
+
+ records = getPreferenceRecords(connection);
+
+ if (preferencesVersion.lessThan(brokerModelVersion))
+ {
+ final Collection<UUID> ids = new HashSet<>();
+ for (PreferenceRecord record : records)
+ {
+ ids.add(record.getId());
+ }
+
+ records =
updater.updatePreferences(preferencesVersion.toString(), records);
+ replace(ids, records);
+ }
+ }
+
+ _storeState.set(StoreState.OPENED);
+ return records;
+ }
+ catch (SQLException e)
+ {
+ _storeState.set(StoreState.ERRORED);
+ close();
+ throw new StoreException(e);
+ }
+ }
+
+ @Override
+ public void updateOrCreate(final Collection<PreferenceRecord>
preferenceRecords)
+ {
+ if (!getStoreState().equals(StoreState.OPENED))
+ {
+ throw new IllegalStateException("PreferenceStore is not opened");
+ }
+
+ performSafeTransaction(new BaseAction<Connection, Exception>()
+ {
+ @Override
+ public void performAction(final Connection connection) throws
Exception
+ {
+ updateOrCreateInternal(connection, preferenceRecords);
+ }
+ });
+ }
+
+ @Override
+ public void replace(final Collection<UUID> preferenceRecordsToRemove,
+ final Collection<PreferenceRecord>
preferenceRecordsToAdd)
+ {
+ if (!getStoreState().equals(StoreState.OPENED))
+ {
+ throw new IllegalStateException("PreferenceStore is not opened");
+ }
+
+ performSafeTransaction(new BaseAction<Connection, Exception>()
+ {
+ @Override
+ public void performAction(final Connection connection) throws
Exception
+ {
+ for (UUID id : preferenceRecordsToRemove)
+ {
+ try (PreparedStatement deleteStatement =
connection.prepareStatement(DELETE_FROM_PREFERENCES))
+ {
+ deleteStatement.setString(1, id.toString());
+ int deletedCount = deleteStatement.executeUpdate();
+ if (deletedCount == 1)
+ {
+ getLogger().debug(String.format("Failed to delete
preference with id %s : no such record", id));
+ }
+ }
+ }
+ updateOrCreateInternal(connection, preferenceRecordsToAdd);
+ }
+ });
+ }
+
+ @Override
+ public void close()
+ {
+ while (true)
+ {
+ StoreState storeState = getStoreState();
+ if (storeState.equals(StoreState.OPENED) ||
storeState.equals(StoreState.ERRORED))
+ {
+ if (_storeState.compareAndSet(storeState, StoreState.CLOSING))
+ {
+ break;
+ }
+ }
+ else if (storeState.equals(StoreState.CLOSED) ||
storeState.equals(StoreState.CLOSING))
+ {
+ return;
+ }
+ }
+
+ doClose();
+
+ _storeState.set(StoreState.CLOSED);
+ }
+
+ protected abstract void doClose();
+
+ protected abstract Logger getLogger();
+
+ protected abstract Connection getConnection() throws SQLException;
+
+ protected abstract String getSqlBlobType();
+
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws
SQLException;
+
+ StoreState getStoreState()
+ {
+ return _storeState.get();
+ }
+
+ private void updateOrCreateInternal(final Connection conn,
+ final Collection<PreferenceRecord>
preferenceRecords)
+ throws SQLException, JsonProcessingException
+ {
+ for (PreferenceRecord record : preferenceRecords)
+ {
+ try (PreparedStatement stmt =
conn.prepareStatement(FIND_PREFERENCE))
+ {
+ stmt.setString(1, record.getId().toString());
+ try (ResultSet rs = stmt.executeQuery())
+ {
+ if (rs.next())
+ {
+ try (PreparedStatement updateStatement =
conn.prepareStatement(UPDATE_PREFERENCES))
+ {
+ setAttributesAsBlob(updateStatement, 1,
record.getAttributes());
+ updateStatement.setString(2,
record.getId().toString());
+ updateStatement.execute();
+ }
+ }
+ else
+ {
+ try (PreparedStatement insertStatement =
conn.prepareStatement(INSERT_INTO_PREFERENCES))
+ {
+ insertStatement.setString(1,
record.getId().toString());
+ setAttributesAsBlob(insertStatement, 2,
record.getAttributes());
+ insertStatement.execute();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void performSafeTransaction(final BaseAction<Connection,
Exception> transactedAction)
+ {
+ Connection connection = null;
+ try
+ {
+ connection = getTransactedConnection();
+ transactedAction.performAction(connection);
+ connection.commit();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if (connection != null)
+ {
+ connection.rollback();
+ }
+ }
+ catch (SQLException e1)
+ {
+ getLogger().error("Failed to rollback transaction", e1);
+ }
+ throw new StoreException(e);
+ }
+ finally
+ {
+ try
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ getLogger().warn("Failed to close JDBC connection", e);
+ }
+ }
+ }
+
+ private Connection getTransactedConnection() throws SQLException
+ {
+ final Connection connection = getConnection();
+ connection.setAutoCommit(false);
+
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ return connection;
+ }
+
+ private void setAttributesAsBlob(final PreparedStatement
preparedSqlStatement,
+ final int parameterIndex,
+ final Map<String, Object> attributes)
+ throws JsonProcessingException, SQLException
+ {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ if (attributes != null)
+ {
+ byte[] attributesAsBytes =
objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new
ByteArrayInputStream(attributesAsBytes);
+ preparedSqlStatement.setBinaryStream(parameterIndex, bis,
attributesAsBytes.length);
+ }
+ else
+ {
+ preparedSqlStatement.setNull(parameterIndex, Types.BLOB);
+ }
+ }
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ if (!tableExists(PREFERENCES_VERSION_TABLE_NAME, conn))
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ stmt.execute(CREATE_PREFERENCES_VERSION_TABLE);
+ }
+
+ try (PreparedStatement pstmt =
conn.prepareStatement(INSERT_INTO_PREFERENCES_VERSION))
+ {
+ pstmt.setString(1, BrokerModel.MODEL_VERSION);
+ pstmt.execute();
+ }
+ }
+ }
+
+ private void createPreferencesTable(final Connection conn) throws
SQLException
+ {
+ if (!tableExists(PREFERENCES_TABLE_NAME, conn))
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ stmt.execute("CREATE TABLE "
+ + PREFERENCES_TABLE_NAME
+ + " ( id VARCHAR(36) not null, attributes "
+ + getSqlBlobType()
+ + ", PRIMARY KEY (id))");
+ }
+ }
+ }
+
+ private ModelVersion getPreferencesVersion(Connection conn) throws
SQLException
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ try (ResultSet rs =
stmt.executeQuery(SELECT_FROM_PREFERENCES_VERSION))
+ {
+ if (rs.next())
+ {
+ String versionString = rs.getString(1);
+ try
+ {
+ return ModelVersion.fromString(versionString);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new StoreException("preference store version is
malformed", e);
+ }
+ }
+ throw new StoreException("No preferences version found");
+ }
+ }
+ }
+
+ private Collection<PreferenceRecord> getPreferenceRecords(final Connection
connection) throws SQLException
+ {
+ Collection<PreferenceRecord> records = new LinkedHashSet<>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try (PreparedStatement stmt =
connection.prepareStatement(SELECT_FROM_PREFERENCES))
+ {
+ try (ResultSet rs = stmt.executeQuery())
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String attributes = getBlobAsString(rs, 2);
+ final PreferenceRecord preferenceRecord = new
PreferenceRecordImpl(UUID.fromString(id), objectMapper.readValue(attributes,
Map.class));
+ records.add(preferenceRecord);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: "
+ e.getMessage(), e);
+ }
+ }
+ return records;
+ }
+
+ enum StoreState
+ {
+ CLOSED, OPENING, OPENED, CLOSING, ERRORED;
+ }
+}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/preferences/PreferenceTestHelper.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/preferences/PreferenceTestHelper.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/preferences/PreferenceTestHelper.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/preferences/PreferenceTestHelper.java
Thu Jul 14 16:01:22 2016
@@ -19,11 +19,17 @@
package org.apache.qpid.server.model.preferences;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.qpid.server.store.preferences.PreferenceRecord;
+
public class PreferenceTestHelper
{
public static Map<String, Object> createPreferenceAttributes(UUID
associatedObjectId,
@@ -47,4 +53,30 @@ public class PreferenceTestHelper
preferenceAttributes.put(Preference.VALUE_ATTRIBUTE,
preferenceValueAttributes);
return preferenceAttributes;
}
+
+ public static void assertRecords(final Collection<PreferenceRecord>
expected,
+ final Collection<PreferenceRecord> actual)
+ {
+ assertEquals("Unexpected number of records", expected.size(),
actual.size());
+
+ for (PreferenceRecord expectedRecord : expected)
+ {
+ PreferenceRecord actualRecord = null;
+ for (PreferenceRecord record : actual)
+ {
+ if (record.getId().equals(expectedRecord.getId()))
+ {
+ actualRecord = record;
+ break;
+ }
+ }
+ assertNotNull(String.format("No actual record found for expected
record '%s'", expectedRecord.getId()),
+ actualRecord);
+ assertEquals(String.format("Expected attributes are different from
actual: %s vs %s",
+
expectedRecord.getAttributes().toString(),
+
actualRecord.getAttributes().toString()),
+ new HashMap<>(expectedRecord.getAttributes()),
+ new HashMap<>(actualRecord.getAttributes()));
+ }
+ }
}
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
Thu Jul 14 16:01:22 2016
@@ -39,6 +39,7 @@ import org.apache.qpid.server.store.File
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.preferences.AbstractJDBCPreferenceStore;
import org.apache.qpid.util.FileUtils;
/**
@@ -52,6 +53,7 @@ public class DerbyConfigurationStore ext
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private final ProvidedMessageStore _providedMessageStore = new
ProvidedMessageStore();
+ private final ProvidedPreferenceStore _providedPreferenceStore = new
ProvidedPreferenceStore();
private String _connectionURL;
@@ -236,4 +238,39 @@ public class DerbyConfigurationStore ext
return DerbyConfigurationStore.this.getLogger();
}
}
+
+ private class ProvidedPreferenceStore extends AbstractJDBCPreferenceStore
+ {
+ private final Logger LOGGER =
LoggerFactory.getLogger(ProvidedPreferenceStore.class);
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return DerbyConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getBlobAsString(final ResultSet rs, final int col)
throws SQLException
+ {
+ return DerbyUtils.getBlobAsString(rs, col);
+ }
+
+ @Override
+ public void doClose()
+ {
+ // noop
+ }
+ }
}
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbySystemConfig.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbySystemConfig.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbySystemConfig.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbySystemConfig.java
Thu Jul 14 16:01:22 2016
@@ -39,6 +39,6 @@ public interface DerbySystemConfig<X ext
Long getStoreOverfullSize();
@ManagedAttribute( description = "Configuration for the preference store,
e.g. type, path, etc.",
- defaultValue = "{\"type\": \"Noop\"}")
+ defaultValue = "{\"type\": \"Provided\"}")
PreferenceStoreAttributes getPreferenceStoreAttributes();
}
Modified:
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
(original)
+++
qpid/java/trunk/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
Thu Jul 14 16:01:22 2016
@@ -21,12 +21,18 @@ package org.apache.qpid.server.virtualho
import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.FileBasedSettings;
import org.apache.qpid.server.store.preferences.PreferenceStoreAttributes;
-public interface DerbyVirtualHostNode<X extends DerbyVirtualHostNode<X>>
extends org.apache.qpid.server.model.VirtualHostNode<X>,
org.apache.qpid.server.store.FileBasedSettings
+public interface DerbyVirtualHostNode<X extends DerbyVirtualHostNode<X>>
extends VirtualHostNode<X>, FileBasedSettings
{
String STORE_PATH = "storePath";
@ManagedAttribute(mandatory = true, defaultValue =
"${qpid.work_dir}${file.separator}${this:name}${file.separator}config")
String getStorePath();
+
+ @ManagedAttribute( description = "Configuration for the preference store,
e.g. type, path, etc.",
+ defaultValue = "{\"type\": \"Provided\"}")
+ PreferenceStoreAttributes getPreferenceStoreAttributes();
}
Added:
qpid/java/trunk/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyPreferenceStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyPreferenceStoreTest.java?rev=1752682&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyPreferenceStoreTest.java
(added)
+++
qpid/java/trunk/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyPreferenceStoreTest.java
Thu Jul 14 16:01:22 2016
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.derby;
+
+import static
org.apache.qpid.server.model.preferences.PreferenceTestHelper.assertRecords;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.preferences.AbstractJDBCPreferenceStore;
+import org.apache.qpid.server.store.preferences.PreferenceRecord;
+import org.apache.qpid.server.store.preferences.PreferenceRecordImpl;
+import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class DerbyPreferenceStoreTest extends QpidTestCase
+{
+ private PreferenceStoreUpdater _updater;
+ private DerbyTestPreferenceStore _preferenceStore;
+ private List<PreferenceRecord> _testRecords;
+ private String _connectionUrl;
+ private Connection _testConnection;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _updater = mock(PreferenceStoreUpdater.class);
+
when(_updater.getLatestVersion()).thenReturn(BrokerModel.MODEL_VERSION);
+
+ final ConfiguredObject<?> parent = mock(ConfiguredObject.class);
+ when(parent.getContext()).thenReturn(Collections.<String,
String>emptyMap());
+
when(parent.getContextKeys(anyBoolean())).thenReturn(Collections.<String>emptySet());
+
+ _connectionUrl = DerbyUtils.createConnectionUrl(getTestName(),
"memory:");
+ _preferenceStore = new DerbyTestPreferenceStore(_connectionUrl);
+
+ _testRecords = Arrays.<PreferenceRecord>asList(
+ new PreferenceRecordImpl(UUID.randomUUID(),
Collections.<String, Object>singletonMap("name", "test")),
+ new PreferenceRecordImpl(UUID.randomUUID(),
Collections.<String, Object>singletonMap("name", "test1")));
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_testConnection != null)
+ {
+ _testConnection.close();
+ }
+ _preferenceStore.close();
+ shutdownDerby();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testOpenAndLoadEmptyStore() throws Exception
+ {
+ Collection<PreferenceRecord> records =
_preferenceStore.openAndLoad(_updater);
+ assertEquals("Unexpected number of records", 0, records.size());
+
+ _testConnection = DriverManager.getConnection(_connectionUrl);
+
+ DerbyUtils.tableExists("PREFERENCES", _testConnection);
+ DerbyUtils.tableExists("PREFERENCES_VERSION", _testConnection);
+
+ List<String> versions = new ArrayList<>();
+ try (PreparedStatement selectStatement =
_testConnection.prepareStatement(
+ "select version from PREFERENCES_VERSION"))
+ {
+ try (ResultSet resultSet = selectStatement.executeQuery())
+ {
+ while (resultSet.next())
+ {
+ versions.add(resultSet.getString(1));
+ }
+ }
+ }
+
+ assertEquals("Unexpected versions size", 1, versions.size());
+ assertEquals("Unexpected version", BrokerModel.MODEL_VERSION,
versions.get(0));
+ }
+
+ public void testOpenAndLoadNonEmptyStore() throws Exception
+ {
+ populateTestData();
+ Collection<PreferenceRecord> records =
_preferenceStore.openAndLoad(_updater);
+
+ assertRecords(_testRecords, records);
+ }
+
+ public void testClose() throws Exception
+ {
+ _preferenceStore.openAndLoad(_updater);
+ _preferenceStore.close();
+
+ try
+ {
+ _preferenceStore.updateOrCreate(_testRecords);
+ fail("Business operation on closed store should fail");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+ }
+
+ public void testUpdateOrCreate() throws Exception
+ {
+ _preferenceStore.openAndLoad(_updater);
+ _preferenceStore.updateOrCreate(_testRecords);
+
+ _testConnection = DriverManager.getConnection(_connectionUrl);
+ List<PreferenceRecord> records = getPreferenceRecords();
+
+ assertRecords(_testRecords, records);
+ }
+
+ public void testReplace() throws Exception
+ {
+ populateTestData();
+ _preferenceStore.openAndLoad(_updater);
+
+ Collection<PreferenceRecord> testRecords = new ArrayList<>();
+ testRecords.add(new PreferenceRecordImpl(UUID.randomUUID(),
Collections.<String, Object>singletonMap("name", "newOne")));
+
+
_preferenceStore.replace(Collections.singleton(_testRecords.get(0).getId()),
testRecords);
+
+ testRecords.add(_testRecords.get(1));
+
+ _testConnection = DriverManager.getConnection(_connectionUrl);
+ List<PreferenceRecord> records = getPreferenceRecords();
+
+ assertRecords(testRecords, records);
+ }
+
+ public void testUpdateFailIfNotOpened() throws Exception
+ {
+ populateTestData();
+ try
+ {
+ _preferenceStore.updateOrCreate(_testRecords);
+ fail("Business operation on not opened store should fail");
+ }
+ catch (IllegalStateException e)
+ {
+ e.printStackTrace();
+ // pass
+ }
+ }
+
+ public void testReplaceFailIfNotOpened() throws Exception
+ {
+ populateTestData();
+ try
+ {
+ _preferenceStore.replace(Collections.<UUID>emptyList(),
_testRecords);
+ fail("Business operation on not opened store should fail");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+ }
+
+
+ private void populateTestData()
+ {
+ DerbyTestPreferenceStore store = new
DerbyTestPreferenceStore(_connectionUrl);
+ try
+ {
+ store.openAndLoad(_updater);
+ store.updateOrCreate(_testRecords);
+ }
+ finally
+ {
+ store.close();
+ }
+ }
+
+ private List<PreferenceRecord> getPreferenceRecords() throws SQLException,
java.io.IOException
+ {
+ List<PreferenceRecord> records = new ArrayList<>();
+ ObjectMapper objectMapper = new ObjectMapper();
+ try (PreparedStatement selectStatement =
_testConnection.prepareStatement(
+ "select id,attributes from PREFERENCES"))
+ {
+ try (ResultSet resultSet = selectStatement.executeQuery())
+ {
+ while (resultSet.next())
+ {
+ records.add(new PreferenceRecordImpl(
+ UUID.fromString(resultSet.getString(1)),
+
objectMapper.readValue(DerbyUtils.getBlobAsString(resultSet, 2), Map.class)));
+ }
+ }
+ }
+ return records;
+ }
+
+ private void shutdownDerby() throws SQLException
+ {
+ Connection connection = null;
+ try
+ {
+ connection = DriverManager.getConnection("jdbc:derby:memory:/" +
getTestName() + ";shutdown=true");
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equalsIgnoreCase("08006"))
+ {
+ //expected and represents a clean shutdown of this database
only, do nothing.
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private static class DerbyTestPreferenceStore extends
AbstractJDBCPreferenceStore
+ {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DerbyTestPreferenceStore.class);
+ private final String _connectionURL;
+
+
+ public DerbyTestPreferenceStore(final String connectionUrl)
+ {
+ _connectionURL = connectionUrl;
+ DerbyUtils.loadDerbyDriver();
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return DriverManager.getConnection(_connectionURL);
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getBlobAsString(final ResultSet rs, final int col)
throws SQLException
+ {
+ return DerbyUtils.getBlobAsString(rs, col);
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // noop
+ }
+ }
+}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
Thu Jul 14 16:01:22 2016
@@ -41,6 +41,8 @@ import org.apache.qpid.server.store.Conf
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.preferences.AbstractJDBCPreferenceStore;
+import org.apache.qpid.server.store.preferences.PreferenceStore;
/**
* Implementation of a DurableConfigurationStore backed by Generic JDBC
Database
@@ -54,6 +56,7 @@ public class GenericJDBCConfigurationSto
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private final MessageStore _providedMessageStore = new
ProvidedMessageStore();
+ private final PreferenceStore _providedPreferenceStore = new
ProvidedPreferenceStore();
private String _connectionURL;
private ConnectionProvider _connectionProvider;
@@ -246,6 +249,11 @@ public class GenericJDBCConfigurationSto
return _providedMessageStore;
}
+ public PreferenceStore getPreferenceStore()
+ {
+ return _providedPreferenceStore;
+ }
+
private class ProvidedMessageStore extends GenericAbstractJDBCMessageStore
{
@Override
@@ -309,5 +317,38 @@ public class GenericJDBCConfigurationSto
}
}
+ private class ProvidedPreferenceStore extends AbstractJDBCPreferenceStore
+ {
+ private final Logger LOGGER =
LoggerFactory.getLogger(ProvidedPreferenceStore.class);
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getConnection();
+ }
+ @Override
+ protected String getSqlBlobType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBlobType();
+ }
+
+ @Override
+ protected String getBlobAsString(final ResultSet rs, final int col)
throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getBlobAsString(rs, col);
+ }
+
+ @Override
+ public void doClose()
+ {
+ // noop
+ }
+ }
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCSystemConfig.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCSystemConfig.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCSystemConfig.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCSystemConfig.java
Thu Jul 14 16:01:22 2016
@@ -39,6 +39,6 @@ public interface JDBCSystemConfig<X exte
String getPassword();
@ManagedAttribute( description = "Configuration for the preference store,
e.g. type, path, etc.",
- defaultValue = "{\"type\": \"Noop\"}")
+ defaultValue = "{\"type\": \"Provided\"}")
PreferenceStoreAttributes getPreferenceStoreAttributes();
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNode.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNode.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNode.java
Thu Jul 14 16:01:22 2016
@@ -25,8 +25,10 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.store.jdbc.DefaultConnectionProviderFactory;
import org.apache.qpid.server.store.jdbc.JDBCSettings;
import org.apache.qpid.server.store.preferences.PreferenceStoreAttributes;
+import org.apache.qpid.server.store.preferences.PreferenceStoreProvider;
-public interface JDBCVirtualHostNode<X extends JDBCVirtualHostNode<X>> extends
VirtualHostNode<X>, JDBCSettings
+public interface JDBCVirtualHostNode<X extends JDBCVirtualHostNode<X>> extends
VirtualHostNode<X>, JDBCSettings,
+
PreferenceStoreProvider
{
@ManagedAttribute(mandatory=true)
String getConnectionUrl();
@@ -41,4 +43,7 @@ public interface JDBCVirtualHostNode<X e
@ManagedAttribute(secure=true)
String getPassword();
+ @ManagedAttribute( description = "Configuration for the preference store,
e.g. type, path, etc.",
+ defaultValue = "{\"type\": \"Provided\"}")
+ PreferenceStoreAttributes getPreferenceStoreAttributes();
}
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java?rev=1752682&r1=1752681&r2=1752682&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
Thu Jul 14 16:01:22 2016
@@ -31,6 +31,7 @@ import org.apache.qpid.server.model.Mana
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
+import org.apache.qpid.server.store.preferences.PreferenceStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category
= false ,
@@ -106,4 +107,10 @@ public class JDBCVirtualHostNodeImpl ext
{
return Collections.singletonMap(VirtualHost.class.getSimpleName(),
getSupportedVirtualHostTypes(true));
}
+
+ @Override
+ public PreferenceStore getPreferenceStore()
+ {
+ return ((GenericJDBCConfigurationStore)
getConfigurationStore()).getPreferenceStore();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]