Repository: activemq-artemis Updated Branches: refs/heads/master 2a415a80e -> e458a4327
ARTEMIS-513 Add JDBC Sequential File Factory Impl Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c9b95343 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c9b95343 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c9b95343 Branch: refs/heads/master Commit: c9b953433e011ded66ba87bdb2b2edcf0b87f67c Parents: 2a415a8 Author: Martyn Taylor <[email protected]> Authored: Tue May 3 14:36:10 2016 +0100 Committer: Martyn Taylor <[email protected]> Committed: Wed May 4 12:24:25 2016 +0100 ---------------------------------------------------------------------- .../activemq/artemis/jdbc/store/JDBCUtils.java | 13 + .../jdbc/store/file/JDBCSequentialFile.java | 419 +++++++++++++++++++ .../store/file/JDBCSequentialFileFactory.java | 229 ++++++++++ .../jdbc/store/file/sql/DerbySQLProvider.java | 52 +++ .../jdbc/store/file/sql/GenericSQLProvider.java | 143 +++++++ .../jdbc/store/file/sql/SQLProvider.java | 46 ++ .../jdbc/store/journal/JDBCJournalImpl.java | 7 +- .../file/JDBCSequentialFileFactoryTest.java | 185 ++++++++ .../core/io/nio/NIOSequentialFileFactory.java | 28 +- 9 files changed, 1107 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java index b44f225..bc04ab9 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java @@ -23,6 +23,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; + public class JDBCUtils { public static Driver getDriver(String className) throws Exception { @@ -60,4 +64,13 @@ public class JDBCUtils { statement.executeUpdate(sql); } } + + public static SQLProvider getSQLProvider(String driverClass, String tableName) { + if (driverClass.contains("derby")) { + return new DerbySQLProvider(tableName); + } + else { + return new GenericSQLProvider(tableName); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java new file mode 100644 index 0000000..73bec72 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; + +public class JDBCSequentialFile implements SequentialFile { + + private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class); + + private final String filename; + + private final String extension; + + private boolean isOpen = false; + + private boolean isCreated = false; + + private int id = -1; + + private final PreparedStatement appendToFile; + + private final PreparedStatement deleteFile; + + private final PreparedStatement readFile; + + private final PreparedStatement createFile; + + private final PreparedStatement selectFileByFileName; + + private final PreparedStatement copyFileRecord; + + private final PreparedStatement renameFile; + + private long readPosition = 0; + + private long writePosition = 0; + + private Executor executor; + + private JDBCSequentialFileFactory fileFactory; + + private int maxSize; + + private SQLProvider sqlProvider; + + private final Object writeLock; + + public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory, + final String filename, + final SQLProvider sqlProvider, + final Executor executor, + final Object writeLock) throws SQLException { + this.fileFactory = fileFactory; + this.filename = filename; + this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : ""; + this.executor = executor; + this.maxSize = sqlProvider.getMaxBlobSize(); + this.sqlProvider = sqlProvider; + this.writeLock = writeLock; + + Connection connection = fileFactory.getConnection(); + this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL()); + this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); + this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); + this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL()); + this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); + this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); + this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); + } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public boolean exists() { + return isCreated; + } + + @Override + public synchronized void open() throws Exception { + if (!isOpen) { + try { + synchronized (writeLock) { + selectFileByFileName.setString(1, filename); + + try (ResultSet rs = selectFileByFileName.executeQuery()) { + if (!rs.next()) { + createFile.setString(1, filename); + createFile.setString(2, extension); + createFile.setBytes(3, new byte[0]); + createFile.executeUpdate(); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + this.id = keys.getInt(1); + } + } + else { + this.id = rs.getInt(1); + this.writePosition = rs.getBlob(4).length(); + } + } + } + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e); + isOpen = false; + } + + isCreated = true; + isOpen = true; + } + } + + @Override + public void open(int maxIO, boolean useExecutor) throws Exception { + open(); + } + + @Override + public boolean fits(int size) { + return writePosition + size <= maxSize; + } + + @Override + public int getAlignment() throws Exception { + return 0; + } + + @Override + public int calculateBlockStart(int position) throws Exception { + return 0; + } + + @Override + public String getFileName() { + return filename; + } + + @Override + public void fill(int size) throws Exception { + // Do nothing + } + + @Override + public void delete() throws IOException, InterruptedException, ActiveMQException { + try { + if (isCreated) { + deleteFile.setInt(1, id); + deleteFile.executeUpdate(); + } + } + catch (SQLException e) { + throw new IOException(e); + } + } + + private synchronized int internalWrite(byte[] data, IOCallback callback) { + try { + synchronized (writeLock) { + int noBytes = data.length; + appendToFile.setBytes(1, data); + appendToFile.setInt(2, id); + int result = appendToFile.executeUpdate(); + if (result < 1) + throw new ActiveMQException("No record found for file id: " + id); + seek(noBytes); + if (callback != null) + callback.done(); + return noBytes; + } + } + catch (Exception e) { + e.printStackTrace(); + if (callback != null) + callback.onError(-1, e.getMessage()); + } + return -1; + } + + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { + byte[] data = new byte[buffer.readableBytes()]; + buffer.readBytes(data); + return internalWrite(data, callback); + } + + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { + return internalWrite(buffer.array(), callback); + } + + public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) { + executor.execute(new Runnable() { + @Override + public void run() { + internalWrite(bytes, callback); + } + }); + } + + public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { + executor.execute(new Runnable() { + @Override + public void run() { + internalWrite(bytes, callback); + } + }); + } + + synchronized void seek(long noBytes) { + writePosition += noBytes; + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception { + // We ignore sync since we schedule writes straight away. + scheduleWrite(bytes, callback); + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) throws Exception { + write(bytes, sync, null); + } + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception { + ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize()); + bytes.encode(data); + scheduleWrite(data, callback); + } + + @Override + public void write(EncodingSupport bytes, boolean sync) throws Exception { + write(bytes, sync, null); + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { + if (callback == null) { + SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback(); + try { + scheduleWrite(bytes, waitIOCallback); + waitIOCallback.waitCompletion(); + } + catch (Exception e) { + waitIOCallback.onError(-1, e.getMessage()); + } + } + else { + scheduleWrite(bytes, callback); + } + + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception { + writeDirect(bytes, sync, null); + // Are we meant to block here? + } + + @Override + public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException { + synchronized (writeLock) { + readFile.setInt(1, id); + try (ResultSet rs = readFile.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + + long bytesRemaining = blob.length() - readPosition; + byte[] data; + if (bytesRemaining > bytes.remaining()) { + // First index into blob is 1 (not 0) + data = blob.getBytes(readPosition + 1, bytes.remaining()); + } + else { + // First index into blob is 1 (not 0) + data = blob.getBytes(readPosition + 1, (int) bytesRemaining); + } + + bytes.put(data); + readPosition += data.length; + if (callback != null) + callback.done(); + + return data.length; + } + return 0; + } + catch (Exception e) { + if (callback != null) + callback.onError(-1, e.getMessage()); + return 0; + } + } + } + + @Override + public int read(ByteBuffer bytes) throws Exception { + return read(bytes, null); + } + + @Override + public void position(long pos) throws IOException { + readPosition = pos; + } + + @Override + public long position() { + return readPosition; + } + + @Override + public synchronized void close() throws Exception { + isOpen = false; + } + + @Override + public void sync() throws IOException { + // (mtaylor) We always write straight away, so we don't need to do anything here. + // (mtaylor) Is this meant to be blocking? + } + + @Override + public long size() throws Exception { + return writePosition; + } + + @Override + public void renameTo(String newFileName) throws Exception { + renameFile.setString(1, newFileName); + renameFile.setInt(2, id); + renameFile.executeUpdate(); + } + + @Override + public SequentialFile cloneFile() { + try { + JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock); + return clone; + } + catch (Exception e) { + logger.error("Error cloning file: " + filename, e); + return null; + } + } + + @Override + public void copyTo(SequentialFile cloneFile) throws Exception { + JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; + clone.open(); + + copyFileRecord.setInt(1, id); + copyFileRecord.setInt(2, clone.getId()); + copyFileRecord.executeUpdate(); + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getFilename() { + return filename; + } + + public String getExtension() { + return extension; + } + + // Only Used by Journal, no need to implement. + @Override + public void setTimedBuffer(TimedBuffer buffer) { + } + + // Only Used by replication, no need to implement. + @Override + public File getJavaFile() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java new file mode 100644 index 0000000..4231907 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file; + +import java.io.File; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { + + private Connection connection; + + private String connectionUrl; + + private final Driver driver; + + private boolean started; + + private final String tableName; + + private List<JDBCSequentialFile> files; + + private PreparedStatement selectFileNamesByExtension; + + private Executor executor; + + private SQLProvider sqlProvider; + + private Map<String, Object> fileLocks = new HashMap<>(); + + public JDBCSequentialFileFactory(final String connectionUrl, + final String tableName, + final String className, + Executor executor) throws Exception { + this.connectionUrl = connectionUrl; + this.executor = executor; + this.tableName = tableName.toUpperCase(); + + files = new ArrayList<>(); + sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName); + driver = JDBCUtils.getDriver(className); + } + + public Connection getConnection() { + return connection; + } + + @Override + public SequentialFile createSequentialFile(String fileName) { + try { + fileLocks.putIfAbsent(fileName, new Object()); + JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName)); + files.add(file); + return file; + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error("Could not create file", e); + } + return null; + } + + @Override + public int getMaxIO() { + return 1; + } + + @Override + public List<String> listFiles(String extension) throws Exception { + List<String> fileNames = new ArrayList<>(); + + selectFileNamesByExtension.setString(1, extension); + try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { + while (rs.next()) { + fileNames.add(rs.getString(1)); + } + } + return fileNames; + } + + @Override + public boolean isSupportsCallbacks() { + return true; + } + + @Override + public void onIOError(Exception exception, String message, SequentialFile file) { + } + + @Override + public ByteBuffer allocateDirectBuffer(final int size) { + return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + } + + @Override + public void releaseDirectBuffer(ByteBuffer buffer) { + // nothing we can do on this case. we can just have good faith on GC + } + + @Override + public ByteBuffer newBuffer(final int size) { + return ByteBuffer.allocate(size); + } + + @Override + public void clearBuffer(final ByteBuffer buffer) { + final int limit = buffer.limit(); + buffer.rewind(); + + for (int i = 0; i < limit; i++) { + buffer.put((byte) 0); + } + + buffer.rewind(); + } + + @Override + public ByteBuffer wrapBuffer(final byte[] bytes) { + return ByteBuffer.wrap(bytes); + } + + @Override + public int getAlignment() { + return 1; + } + + @Override + public int calculateBlockSize(final int bytes) { + return bytes; + } + + @Override + public void deactivateBuffer() { + } + + @Override + public void releaseBuffer(final ByteBuffer buffer) { + } + + @Override + public void activateBuffer(SequentialFile file) { + + } + + @Override + public File getDirectory() { + return null; + } + + @Override + public synchronized void start() { + try { + if (!started) { + connection = driver.connect(connectionUrl, new Properties()); + JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL()); + selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + started = true; + } + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database"); + started = false; + } + } + + @Override + public synchronized void stop() { + try { + if (false) + connection.close(); + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection"); + } + started = false; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void createDirs() throws Exception { + } + + @Override + public void flush() { + + } + + public synchronized void destroy() throws SQLException { + Statement statement = connection.createStatement(); + statement.executeUpdate(sqlProvider.getDropFileTableSQL()); + stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java new file mode 100644 index 0000000..c14036e --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public class DerbySQLProvider extends GenericSQLProvider { + + // Derby max blob size = 2G + private static final int MAX_BLOB_SIZE = 2147483647; + + private final String createFileTableSQL; + + private final String appendToFileSQL; + + public DerbySQLProvider(String tableName) { + super(tableName); + + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + + "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; + + appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?"; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getAppendToFileSQL() { + return appendToFileSQL; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java new file mode 100644 index 0000000..c95edb3 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public class GenericSQLProvider implements SQLProvider { + + // Default to lowest (MYSQL = 64k) + private static final int MAX_BLOB_SIZE = 64512; + + private final String tableName; + + private final String createFileTableSQL; + + private final String insertFileSQL; + + private final String selectFileNamesByExtensionSQL; + + private final String selectIdByFileNameSQL; + + private final String appendToFileSQL; + + private final String readFileSQL; + + private final String deleteFileSQL; + + private final String updateFileNameByIdSQL; + + private final String copyFileRecordByIdSQL; + + private final String cloneFileRecordSQL; + + private final String dropFileTableSQL; + + public GenericSQLProvider(String tableName) { + this.tableName = tableName; + + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; + + insertFileSQL = "INSERT INTO " + tableName + + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)"; + + selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?"; + + selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?"; + + appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?"; + + readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?"; + + deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?"; + + updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?"; + + cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)"; + + copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?"; + + dropFileTableSQL = "DROP TABLE " + tableName; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getInsertFileSQL() { + return insertFileSQL; + } + + @Override + public String getSelectFileByFileName() { + return selectIdByFileNameSQL; + } + + @Override + public String getSelectFileNamesByExtensionSQL() { + return selectFileNamesByExtensionSQL; + } + + @Override + public String getAppendToFileSQL() { + return appendToFileSQL; + } + + @Override + public String getReadFileSQL() { + return readFileSQL; + } + + @Override + public String getDeleteFileSQL() { + return deleteFileSQL; + } + + @Override + public String getUpdateFileNameByIdSQL() { + return updateFileNameByIdSQL; + } + + @Override + public String getCopyFileRecordByIdSQL() { + return copyFileRecordByIdSQL; + } + + @Override + public String getCloneFileRecordByIdSQL() { + return cloneFileRecordSQL; + } + + @Override + public String getDropFileTableSQL() { + return dropFileTableSQL; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java new file mode 100644 index 0000000..e9fe36c --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public interface SQLProvider { + + int getMaxBlobSize(); + + String getTableName(); + + String getCreateFileTableSQL(); + + String getInsertFileSQL(); + + String getSelectFileNamesByExtensionSQL(); + + String getSelectFileByFileName(); + + String getAppendToFileSQL(); + + String getReadFileSQL(); + + String getDeleteFileSQL(); + + String getUpdateFileNameByIdSQL(); + + String getCopyFileRecordByIdSQL(); + + String getDropFileTableSQL(); + + String getCloneFileRecordByIdSQL(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 73a8602..f253167 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -229,12 +229,13 @@ public class JDBCJournalImpl implements Journal { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ - private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException { + private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException { List<RecordInfo> iterableCopy; List<TransactionHolder> iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); + for (Long txId : committedTx) { transactions.get(txId).committed = true; } @@ -319,7 +320,7 @@ public class JDBCJournalImpl implements Journal { if (callback != null) callback.waitCompletion(); } - private void addTxRecord(JDBCJournalRecord record) { + private synchronized void addTxRecord(JDBCJournalRecord record) { TransactionHolder txHolder = transactions.get(record.getTxId()); if (txHolder == null) { txHolder = new TransactionHolder(record.getTxId()); @@ -341,7 +342,7 @@ public class JDBCJournalImpl implements Journal { } } - private void removeTxRecord(JDBCJournalRecord record) { + private synchronized void removeTxRecord(JDBCJournalRecord record) { TransactionHolder txHolder = transactions.get(record.getTxId()); // We actually only need the record ID in this instance. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java new file mode 100644 index 0000000..554f36b --- /dev/null +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.file; + +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; +import org.apache.derby.jdbc.EmbeddedDriver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JDBCSequentialFileFactoryTest { + + private static String connectionUrl = "jdbc:derby:target/data;create=true"; + + private static String tableName = "FILES"; + + private static String className = EmbeddedDriver.class.getCanonicalName(); + + private JDBCSequentialFileFactory factory; + + @Before + public void setup() throws Exception { + Executor executor = Executors.newSingleThreadExecutor(); + + factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor); + factory.start(); + } + + @After + public void tearDown() throws SQLException { + factory.destroy(); + } + + @Test + public void testJDBCFileFactoryStarted() throws Exception { + assertTrue(factory.isStarted()); + } + + @Test + public void testCreateFiles() throws Exception { + int noFiles = 100; + Set<String> fileNames = new HashSet<String>(); + for (int i = 0; i < noFiles; i++) { + String fileName = UUID.randomUUID().toString() + ".txt"; + fileNames.add(fileName); + SequentialFile file = factory.createSequentialFile(fileName); + // We create files on Open + file.open(); + } + + List<String> queryFileNames = factory.listFiles("txt"); + assertTrue(queryFileNames.containsAll(fileNames)); + } + + @Test + public void testAsyncAppendToFile() throws Exception { + + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 1); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + callback.assertEmpty(5); + checkData(file, src); + } + + @Test + public void testCopyFile() throws Exception { + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 5); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt"); + file.copyTo(copy); + + checkData(copy, src); + checkData(file, src); + } + + @Test + public void testCloneFile() throws Exception { + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 5); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile(); + } + + private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException { + expectedData.resetReaderIndex(); + + byte[] resultingBytes = new byte[expectedData.readableBytes()]; + ByteBuffer byteBuffer = ByteBuffer.allocate(expectedData.readableBytes()); + + file.read(byteBuffer, null); + expectedData.getBytes(0, resultingBytes); + + assertArrayEquals(resultingBytes, byteBuffer.array()); + } + + private class IOCallbackCountdown implements IOCallback { + + private final CountDownLatch countDownLatch; + + public IOCallbackCountdown(int size) { + this.countDownLatch = new CountDownLatch(size); + } + + @Override + public void done() { + countDownLatch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + fail(errorMessage); + } + + public void assertEmpty(int timeout) throws InterruptedException { + countDownLatch.await(timeout, TimeUnit.SECONDS); + assertEquals(countDownLatch.getCount(), 0); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9b95343/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 67c3038..a5884b9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -65,18 +65,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); } - @Override - public SequentialFile createSequentialFile(final String fileName) { - return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); - } - - @Override - public boolean isSupportsCallbacks() { - return timedBuffer != null; - } - - @Override - public ByteBuffer allocateDirectBuffer(final int size) { + public static ByteBuffer allocateDirectByteBuffer(final int size) { // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 ByteBuffer buffer2 = null; try { @@ -105,6 +94,21 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { } @Override + public SequentialFile createSequentialFile(final String fileName) { + return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); + } + + @Override + public boolean isSupportsCallbacks() { + return timedBuffer != null; + } + + @Override + public ByteBuffer allocateDirectBuffer(final int size) { + return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + } + + @Override public void releaseDirectBuffer(ByteBuffer buffer) { // nothing we can do on this case. we can just have good faith on GC }
