Hi, On 3/13/07, Bryan Davis <[EMAIL PROTECTED]> wrote:
I am working on a response to the many recent additions to this thread (hopefully will have something later today).
If you're interested, see below for some code I drafted together last year when this subject was up earlier. I quickly updated the code to match the latest changes in Jackrabbit. The class is just a quick prototype, i.e. it compiles but is not tested and not really documented. PS. How about moving this discussion to the development mailing list? BR, Jukka Zitting /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jackrabbit.core.persistence.db; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import javax.naming.InitialContext; import javax.sql.DataSource; import org.apache.jackrabbit.core.NodeId; import org.apache.jackrabbit.core.PropertyId; import org.apache.jackrabbit.core.fs.BasedFileSystem; import org.apache.jackrabbit.core.fs.FileSystem; import org.apache.jackrabbit.core.persistence.PMContext; import org.apache.jackrabbit.core.persistence.PersistenceManager; import org.apache.jackrabbit.core.persistence.util.BLOBStore; import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore; import org.apache.jackrabbit.core.persistence.util.Serializer; import org.apache.jackrabbit.core.state.ChangeLog; import org.apache.jackrabbit.core.state.ItemState; import org.apache.jackrabbit.core.state.ItemStateException; import org.apache.jackrabbit.core.state.NoSuchItemStateException; import org.apache.jackrabbit.core.state.NodeReferences; import org.apache.jackrabbit.core.state.NodeReferencesId; import org.apache.jackrabbit.core.state.NodeState; import org.apache.jackrabbit.core.state.PropertyState; public class DataSourcePersistenceManager implements PersistenceManager { /** * The underlying data source. */ private DataSource database; /** * JNDI location of the data source used to acquire database connections. */ private String location; /** * Schema object prefix. */ private String prefix; /** * Blob store. */ private BLOBStore blobs; private String nodeExistsSQL; private String propExistsSQL; private String refsExistsSQL; private String nodeSelectSQL; private String propSelectSQL; private String refsSelectSQL; private String nodeInsertSQL; private String propInsertSQL; private String refsInsertSQL; private String nodeUpdateSQL; private String propUpdateSQL; private String refsUpdateSQL; private String nodeDeleteSQL; private String propDeleteSQL; private String refsDeleteSQL; //----------------------------------------------------< setters & getters > /** * Returns the JNDI location of the data source. * * @return data source location */ public String getDataSourceLocation() { return location; } /** * Sets the JNDI location of the data source. * * @param location data source location */ public void setDataSourceLocation(String location) { this.location = location; } /** * Returns the schema object prefix. * * @return schema object prefix */ public String getSchemaObjectPrefix() { return prefix; } /** * Sets the schema object prefix. * * @param prefix */ public void setSchemaObjectPrefix(String prefix) { this.prefix = prefix.toUpperCase(); } //--------------------------------------------------< PersistenceManager > /** * Initializes this persistence manager. */ public void init(PMContext context) throws Exception { database = (DataSource) new InitialContext().lookup(location); FileSystem filesystem = new BasedFileSystem(context.getFileSystem(), "blobs"); filesystem.init(); blobs = new FileSystemBLOBStore(filesystem); nodeExistsSQL = "SELECT 1 FROM " + prefix + "NODE WHERE NODE_ID=?"; propExistsSQL = "SELECT 1 FROM " + prefix + "PROP WHERE PROP_ID=?"; refsExistsSQL = "SELECT 1 FROM " + prefix + "REFS WHERE NODE_ID=?"; nodeSelectSQL = "SELECT NODE_DATA FROM " + prefix + "NODE WHERE NODE_ID=?"; propSelectSQL = "SELECT PROP_DATA FROM " + prefix + "PROP WHERE PROP_ID=?"; refsSelectSQL = "SELECT REFS_DATA FROM " + prefix + "REFS WHERE NODE_ID=?"; nodeInsertSQL = "INSERT INTO " + prefix + "NODE (NODE_DATA,NODE_ID) VALUES (?,?)"; propInsertSQL = "INSERT INTO " + prefix + "PROP (PROP_DATA,PROP_ID) VALUES (?,?)"; refsInsertSQL = "INSERT INTO " + prefix + "REFS (REFS_DATA,NODE_ID) VALUES (?,?)"; nodeUpdateSQL = "UPDATE " + prefix + "NODE SET NODE_DATA=? WHERE NODE_ID=?"; propUpdateSQL = "UPDATE " + prefix + "PROP SET PROP_DATA=? WHERE PROP_ID=?"; refsUpdateSQL = "UPDATE " + prefix + "REFS SET REFS_DATA=? WHERE NODE_ID=?"; nodeDeleteSQL = "DELETE FROM " + prefix + "NODE WHERE NODE_ID=?"; propDeleteSQL = "DELETE FROM " + prefix + "PROP WHERE PROP_ID=?"; refsDeleteSQL = "DELETE FROM " + prefix + "REFS WHERE NODE_ID=?"; } /** * Closes this persistence manager. */ public void close() { database = null; blobs = null; } /** * Creates a new node state instance. * * @param id node identifier * @return node state */ public NodeState createNew(NodeId id) { return new NodeState(id, null, null, NodeState.STATUS_NEW, false); } /** * Creates a new property state instance. * * @param id property identifier * @return property state */ public PropertyState createNew(PropertyId id) { return new PropertyState(id, PropertyState.STATUS_NEW, false); } /** * Checks whether the identified node state exists. * * @param id node identifier * @return <code>true</code> if the node state exists, * <code>false</code> otherwise * @throws ItemStateException if a database error occurred */ public boolean exists(NodeId id) throws ItemStateException { return exists(nodeExistsSQL, id.toString()); } /** * Checks whether the identified property state exists. * * @param id property identifier * @return <code>true</code> if the property state exists, * <code>false</code> otherwise * @throws ItemStateException if a database error occurred */ public boolean exists(PropertyId id) throws ItemStateException { return exists(propExistsSQL, id.toString()); } /** * Checks whether references to the identified node exists. * * @param targetId reference identifier * @return <code>true</code> if references to the identified node exist, * <code>false</code> otherwise * @throws ItemStateException if a database error occurred */ public boolean exists(NodeReferencesId targetId) throws ItemStateException { return exists(refsExistsSQL, targetId.toString()); } /** * Loads the identified node state. * * @param id node identifier * @return node state * @throws NoSuchItemStateException if the node state does not exist * @throws ItemStateException if a database error occurred */ public NodeState load(NodeId id) throws NoSuchItemStateException, ItemStateException { final NodeState state = createNew(id); load(nodeSelectSQL, id.toString(), new RecordReader() { public void readRecord(InputStream stream) throws Exception { Serializer.deserialize(state, stream); } }); return state; } /** * Loads the identified property state. * * @param id property identifier * @return property state * @throws NoSuchItemStateException if the property state does not exist * @throws ItemStateException if a database error occurred */ public PropertyState load(PropertyId id) throws NoSuchItemStateException, ItemStateException { final PropertyState state = createNew(id); load(propSelectSQL, id.toString(), new RecordReader() { public void readRecord(InputStream stream) throws Exception { Serializer.deserialize(state, stream, blobs); } }); return state; } /** * Loads references to the identified node. * * @param id reference identifier * @return node references * @throws NoSuchItemStateException if there are no references to the node * @throws ItemStateException if a database error occurred */ public NodeReferences load(NodeReferencesId id) throws NoSuchItemStateException, ItemStateException { final NodeReferences references = new NodeReferences(id); load(refsSelectSQL, id.toString(), new RecordReader() { public void readRecord(InputStream stream) throws Exception { Serializer.deserialize(references, stream); } }); return references; } /** * Persists all the changes in the given change log. No changes are * persisted if an error occurs. * * @param changeLog change log * @throws ItemStateException if a database error occurred */ public void store(ChangeLog changeLog) throws ItemStateException { try { Connection connection = database.getConnection(); try { storeItemStates( connection, changeLog.addedStates(), nodeInsertSQL, propInsertSQL); storeItemStates( connection, changeLog.modifiedStates(), nodeUpdateSQL, propUpdateSQL); deleteItemStates(connection, changeLog.deletedStates()); storeNodeReferences(connection, changeLog.modifiedRefs()); } finally { connection.close(); } } catch (SQLException e) { throw new ItemStateException("Database error", e); } } //-------------------------------------------------------------< private > private interface RecordReader { void readRecord(InputStream stream) throws Exception; } private interface RecordWriter { String getId(Object record); void writeRecord(Object record, OutputStream stream) throws Exception; } /** * Checks whether the identified database record exists. * * @param sql the SQL SELECT statement to use for the check * @param id record identifier * @return <code>true</code> if the identified record exists, * <code>false</code> otherwise * @throws ItemStateException if a database error occurred */ private boolean exists(String sql, String id) throws ItemStateException { try { Connection connection = database.getConnection(); try { PreparedStatement select = connection.prepareStatement(sql); try { select.setString(1, id.toString()); ResultSet rs = select.executeQuery(); try { return rs.next(); } finally { rs.close(); } } finally { select.close(); } } finally { connection.close(); } } catch (SQLException e) { throw new ItemStateException("Database error", e); } } /** * Loads the identified database record. The record is deserialized using * the given deserializer instance. * * @param sql the SQL SELECT statement to use for loading the record * @param id record identifier * @param reader record reader * @throws NoSuchItemStateException if the record does not exist * @throws ItemStateException if a database error occurred */ private void load(String sql, String id, RecordReader reader) throws NoSuchItemStateException, ItemStateException { try { Connection connection = database.getConnection(); try { PreparedStatement select = connection.prepareStatement(sql); try { select.setString(1, id); ResultSet rs = select.executeQuery(); try { if (rs.next()) { InputStream stream = rs.getBinaryStream(1); try { reader.readRecord(stream); } catch (Exception e) { throw new ItemStateException( "Deserialization failed: " + id, e); } finally { stream.close(); } } else { throw new NoSuchItemStateException(id); } } catch (IOException e) { throw new ItemStateException("Database error", e); } finally { rs.close(); } } finally { select.close(); } } finally { connection.close(); } } catch (SQLException e) { throw new ItemStateException("Database error", e); } } private void classifyItemStates( Iterator iterator, Collection nodes, Collection props) { while (iterator.hasNext()) { ItemState state = (ItemState) iterator.next(); if (state.isNode()) { nodes.add(state); } else { props.add(state); } } } private void storeItemStates( Connection connection, Iterator iterator, String nodeSQL, String propSQL) throws SQLException { Collection nodes = new ArrayList(); Collection props = new ArrayList(); classifyItemStates(iterator, nodes, props); if (!nodes.isEmpty()) { storeRecords(connection, nodeSQL, new RecordWriter() { public String getId(Object record) { return ((NodeState) record).getId().toString(); } public void writeRecord( Object record, OutputStream stream) throws Exception { Serializer.serialize((NodeState) record, stream); } }, nodes.iterator()); } if (!props.isEmpty()) { storeRecords(connection, propSQL, new RecordWriter() { public String getId(Object record) { return ((PropertyState) record).getId().toString(); } public void writeRecord( Object record, OutputStream stream) throws Exception { Serializer.serialize((PropertyState) record, stream, blobs); } }, props.iterator()); } } private void storeRecords( Connection connection, String sql, RecordWriter writer, Iterator iterator) throws SQLException { PreparedStatement statement = connection.prepareStatement(sql); try { while (iterator.hasNext()) { Object record = iterator.next(); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); try { writer.writeRecord(record, buffer); } catch (Exception e) { throw new SQLException("Serialization failed: " + record); } byte[] bytes = buffer.toByteArray(); statement.setBinaryStream( 1, new ByteArrayInputStream(bytes), bytes.length); statement.setString(2, writer.getId(record)); statement.execute(); } } finally { statement.close(); } } private void deleteItemStates(Connection connection, Iterator iterator) throws SQLException { Collection nodes = new ArrayList(); Collection props = new ArrayList(); classifyItemStates(iterator, nodes, props); if (!nodes.isEmpty()) { deleteItemStates(connection, nodeDeleteSQL, nodes.iterator()); } if (!props.isEmpty()) { deleteItemStates(connection, propDeleteSQL, props.iterator()); } } private void deleteItemStates( Connection connection, String sql, Iterator iterator) throws SQLException { PreparedStatement statement = connection.prepareStatement(sql); try { while (iterator.hasNext()) { ItemState state = (ItemState) iterator.next(); statement.setString(1, state.getId().toString()); statement.execute(); } } finally { statement.close(); } } private void storeNodeReferences( Connection connection, Iterator iterator) throws SQLException { if (iterator.hasNext()) { Collection insert = new ArrayList(); Collection update = new ArrayList(); Collection delete = new ArrayList(); classifyNodeReferences(connection, iterator, insert, update, delete); if (!insert.isEmpty()) { storeNodeReferences( connection, refsInsertSQL, insert.iterator()); } if (!update.isEmpty()) { storeNodeReferences( connection, refsUpdateSQL, update.iterator()); } if (!delete.isEmpty()) { deleteNodeReferences(connection, delete.iterator()); } } } private void storeNodeReferences( Connection connection, String sql, Iterator iterator) throws SQLException { storeRecords(connection, sql, new RecordWriter() { public String getId(Object record) { return ((NodeReferences) record).getId().toString(); } public void writeRecord(Object record, OutputStream stream) throws Exception { Serializer.serialize((NodeReferences) record, stream); } }, iterator); } private void deleteNodeReferences(Connection connection, Iterator iterator) throws SQLException { PreparedStatement delete = connection.prepareStatement(refsDeleteSQL); try { while (iterator.hasNext()) { NodeReferences references = (NodeReferences) iterator.next(); delete.setString(1, references.getId().toString()); delete.execute(); } } finally { delete.close(); } } private void classifyNodeReferences( Connection connection, Iterator iterator, Collection insert, Collection update, Collection delete) throws SQLException { PreparedStatement select = connection.prepareStatement(refsExistsSQL); try { while (iterator.hasNext()) { NodeReferences references = (NodeReferences) iterator.next(); if (!references.hasReferences()) { delete.add(references); } else { select.setString(1, references.getId().toString()); ResultSet rs = select.executeQuery(); try { if (rs.next()) { update.add(references); } else { insert.add(references); } } finally { rs.close(); } } } } finally { select.close(); } } }
