Hi,

On 3/13/07, Bryan Davis <[EMAIL PROTECTED]> wrote:
I am working on a response to the many recent additions to this thread
(hopefully will have something later today).

If you're interested, see below for some code I drafted together last
year when this subject was up earlier. I quickly updated the code to
match the latest changes in Jackrabbit. The class is just a quick
prototype, i.e. it compiles but is not tested and not really
documented.

PS. How about moving this discussion to the development mailing list?

BR,

Jukka Zitting

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.core.persistence.db;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import javax.naming.InitialContext;
import javax.sql.DataSource;

import org.apache.jackrabbit.core.NodeId;
import org.apache.jackrabbit.core.PropertyId;
import org.apache.jackrabbit.core.fs.BasedFileSystem;
import org.apache.jackrabbit.core.fs.FileSystem;
import org.apache.jackrabbit.core.persistence.PMContext;
import org.apache.jackrabbit.core.persistence.PersistenceManager;
import org.apache.jackrabbit.core.persistence.util.BLOBStore;
import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore;
import org.apache.jackrabbit.core.persistence.util.Serializer;
import org.apache.jackrabbit.core.state.ChangeLog;
import org.apache.jackrabbit.core.state.ItemState;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.apache.jackrabbit.core.state.NodeReferencesId;
import org.apache.jackrabbit.core.state.NodeState;
import org.apache.jackrabbit.core.state.PropertyState;

public class DataSourcePersistenceManager implements PersistenceManager {

   /**
    * The underlying data source.
    */
   private DataSource database;

   /**
    * JNDI location of the data source used to acquire database connections.
    */
   private String location;

   /**
    * Schema object prefix.
    */
   private String prefix;

   /**
    * Blob store.
    */
   private BLOBStore blobs;

   private String nodeExistsSQL;
   private String propExistsSQL;
   private String refsExistsSQL;

   private String nodeSelectSQL;
   private String propSelectSQL;
   private String refsSelectSQL;

   private String nodeInsertSQL;
   private String propInsertSQL;
   private String refsInsertSQL;

   private String nodeUpdateSQL;
   private String propUpdateSQL;
   private String refsUpdateSQL;

   private String nodeDeleteSQL;
   private String propDeleteSQL;
   private String refsDeleteSQL;

   //----------------------------------------------------< setters & getters >

   /**
    * Returns the JNDI location of the data source.
    *
    * @return data source location
    */
   public String getDataSourceLocation() {
       return location;
   }

   /**
    * Sets the JNDI location of the data source.
    *
    * @param location data source location
    */
   public void setDataSourceLocation(String location) {
       this.location = location;
   }

   /**
    * Returns the schema object prefix.
    *
    * @return schema object prefix
    */
   public String getSchemaObjectPrefix() {
       return prefix;
   }

   /**
    * Sets the schema object prefix.
    *
    * @param prefix
    */
   public void setSchemaObjectPrefix(String prefix) {
       this.prefix = prefix.toUpperCase();
   }

   //--------------------------------------------------< PersistenceManager >

   /**
    * Initializes this persistence manager.
    */
   public void init(PMContext context) throws Exception {
       database = (DataSource) new InitialContext().lookup(location);

       FileSystem filesystem =
           new BasedFileSystem(context.getFileSystem(), "blobs");
       filesystem.init();
       blobs = new FileSystemBLOBStore(filesystem);

       nodeExistsSQL = "SELECT 1 FROM " + prefix + "NODE WHERE NODE_ID=?";
       propExistsSQL = "SELECT 1 FROM " + prefix + "PROP WHERE PROP_ID=?";
       refsExistsSQL = "SELECT 1 FROM " + prefix + "REFS WHERE NODE_ID=?";
       nodeSelectSQL = "SELECT NODE_DATA FROM " + prefix + "NODE
WHERE NODE_ID=?";
       propSelectSQL = "SELECT PROP_DATA FROM " + prefix + "PROP
WHERE PROP_ID=?";
       refsSelectSQL = "SELECT REFS_DATA FROM " + prefix + "REFS
WHERE NODE_ID=?";
       nodeInsertSQL = "INSERT INTO " + prefix + "NODE
(NODE_DATA,NODE_ID) VALUES (?,?)";
       propInsertSQL = "INSERT INTO " + prefix + "PROP
(PROP_DATA,PROP_ID) VALUES (?,?)";
       refsInsertSQL = "INSERT INTO " + prefix + "REFS
(REFS_DATA,NODE_ID) VALUES (?,?)";
       nodeUpdateSQL = "UPDATE " + prefix + "NODE SET NODE_DATA=?
WHERE NODE_ID=?";
       propUpdateSQL = "UPDATE " + prefix + "PROP SET PROP_DATA=?
WHERE PROP_ID=?";
       refsUpdateSQL = "UPDATE " + prefix + "REFS SET REFS_DATA=?
WHERE NODE_ID=?";
       nodeDeleteSQL = "DELETE FROM " + prefix + "NODE WHERE NODE_ID=?";
       propDeleteSQL = "DELETE FROM " + prefix + "PROP WHERE PROP_ID=?";
       refsDeleteSQL = "DELETE FROM " + prefix + "REFS WHERE NODE_ID=?";
   }

   /**
    * Closes this persistence manager.
    */
   public void close() {
       database = null;
       blobs = null;
   }

   /**
    * Creates a new node state instance.
    *
    * @param id node identifier
    * @return node state
    */
   public NodeState createNew(NodeId id) {
       return new NodeState(id, null, null, NodeState.STATUS_NEW, false);
   }

   /**
    * Creates a new property state instance.
    *
    * @param id property identifier
    * @return property state
    */
   public PropertyState createNew(PropertyId id) {
       return new PropertyState(id, PropertyState.STATUS_NEW, false);
   }

   /**
    * Checks whether the identified node state exists.
    *
    * @param id node identifier
    * @return <code>true</code> if the node state exists,
    *         <code>false</code> otherwise
    * @throws ItemStateException if a database error occurred
    */
   public boolean exists(NodeId id) throws ItemStateException {
       return exists(nodeExistsSQL, id.toString());
   }

   /**
    * Checks whether the identified property state exists.
    *
    * @param id property identifier
    * @return <code>true</code> if the property state exists,
    *         <code>false</code> otherwise
    * @throws ItemStateException if a database error occurred
    */
   public boolean exists(PropertyId id) throws ItemStateException {
       return exists(propExistsSQL, id.toString());
   }

   /**
    * Checks whether references to the identified node exists.
    *
    * @param targetId reference identifier
    * @return <code>true</code> if references to the identified node exist,
    *         <code>false</code> otherwise
    * @throws ItemStateException if a database error occurred
    */
   public boolean exists(NodeReferencesId targetId) throws ItemStateException {
       return exists(refsExistsSQL, targetId.toString());
   }

   /**
    * Loads the identified node state.
    *
    * @param id node identifier
    * @return node state
    * @throws NoSuchItemStateException if the node state does not exist
    * @throws ItemStateException if a database error occurred
    */
   public NodeState load(NodeId id)
           throws NoSuchItemStateException, ItemStateException {
       final NodeState state = createNew(id);
       load(nodeSelectSQL, id.toString(), new RecordReader() {
           public void readRecord(InputStream stream) throws Exception {
               Serializer.deserialize(state, stream);
           }
       });
       return state;
   }

   /**
    * Loads the identified property state.
    *
    * @param id property identifier
    * @return property state
    * @throws NoSuchItemStateException if the property state does not exist
    * @throws ItemStateException if a database error occurred
    */
   public PropertyState load(PropertyId id)
           throws NoSuchItemStateException, ItemStateException {
       final PropertyState state = createNew(id);
       load(propSelectSQL, id.toString(), new RecordReader() {
           public void readRecord(InputStream stream) throws Exception {
               Serializer.deserialize(state, stream, blobs);
           }
       });
       return state;
   }

   /**
    * Loads references to the identified node.
    *
    * @param id reference identifier
    * @return node references
    * @throws NoSuchItemStateException if there are no references to the node
    * @throws ItemStateException if a database error occurred
    */
   public NodeReferences load(NodeReferencesId id)
           throws NoSuchItemStateException, ItemStateException {
       final NodeReferences references = new NodeReferences(id);
       load(refsSelectSQL, id.toString(), new RecordReader() {
           public void readRecord(InputStream stream) throws Exception {
               Serializer.deserialize(references, stream);
           }
       });
       return references;
   }

   /**
    * Persists all the changes in the given change log. No changes are
    * persisted if an error occurs.
    *
    * @param changeLog change log
    * @throws ItemStateException if a database error occurred
    */
   public void store(ChangeLog changeLog) throws ItemStateException {
       try {
           Connection connection = database.getConnection();
           try {
               storeItemStates(
                       connection, changeLog.addedStates(),
                       nodeInsertSQL, propInsertSQL);
               storeItemStates(
                       connection, changeLog.modifiedStates(),
                       nodeUpdateSQL, propUpdateSQL);
               deleteItemStates(connection, changeLog.deletedStates());
               storeNodeReferences(connection, changeLog.modifiedRefs());
           } finally {
               connection.close();
           }
       } catch (SQLException e) {
           throw new ItemStateException("Database error", e);
       }
   }

   //-------------------------------------------------------------< private >

   private interface RecordReader {

       void readRecord(InputStream stream) throws Exception;

   }

   private interface RecordWriter {

       String getId(Object record);

       void writeRecord(Object record, OutputStream stream) throws Exception;

   }

   /**
    * Checks whether the identified database record exists.
    *
    * @param sql the SQL SELECT statement to use for the check
    * @param id record identifier
    * @return <code>true</code> if the identified record exists,
    *         <code>false</code> otherwise
    * @throws ItemStateException if a database error occurred
    */
   private boolean exists(String sql, String id) throws ItemStateException {
       try {
           Connection connection = database.getConnection();
           try {
               PreparedStatement select = connection.prepareStatement(sql);
               try {
                   select.setString(1, id.toString());
                   ResultSet rs = select.executeQuery();
                   try {
                       return rs.next();
                   } finally {
                       rs.close();
                   }
               } finally {
                   select.close();
               }
           } finally {
               connection.close();
           }
       } catch (SQLException e) {
           throw new ItemStateException("Database error", e);
       }
   }

   /**
    * Loads the identified database record. The record is deserialized using
    * the given deserializer instance.
    *
    * @param sql the SQL SELECT statement to use for loading the record
    * @param id record identifier
    * @param reader record reader
    * @throws NoSuchItemStateException if the record does not exist
    * @throws ItemStateException if a database error occurred
    */
   private void load(String sql, String id, RecordReader reader)
           throws NoSuchItemStateException, ItemStateException {
       try {
           Connection connection = database.getConnection();
           try {
               PreparedStatement select = connection.prepareStatement(sql);
               try {
                   select.setString(1, id);
                   ResultSet rs = select.executeQuery();
                   try {
                       if (rs.next()) {
                           InputStream stream = rs.getBinaryStream(1);
                           try {
                               reader.readRecord(stream);
                           } catch (Exception e) {
                               throw new ItemStateException(
                                       "Deserialization failed: " + id, e);
                           } finally {
                               stream.close();
                           }
                       } else {
                           throw new NoSuchItemStateException(id);
                       }
                   } catch (IOException e) {
                       throw new ItemStateException("Database error", e);
                   } finally {
                       rs.close();
                   }
               } finally {
                   select.close();
               }
           } finally {
               connection.close();
           }
       } catch (SQLException e) {
           throw new ItemStateException("Database error", e);
       }
   }

   private void classifyItemStates(
           Iterator iterator, Collection nodes, Collection props) {
       while (iterator.hasNext()) {
           ItemState state = (ItemState) iterator.next();
           if (state.isNode()) {
               nodes.add(state);
           } else {
               props.add(state);
           }
       }
   }

   private void storeItemStates(
           Connection connection, Iterator iterator,
           String nodeSQL, String propSQL) throws SQLException {
       Collection nodes = new ArrayList();
       Collection props = new ArrayList();
       classifyItemStates(iterator, nodes, props);
       if (!nodes.isEmpty()) {
           storeRecords(connection, nodeSQL, new RecordWriter() {
               public String getId(Object record) {
                   return ((NodeState) record).getId().toString();
               }
               public void writeRecord(
                       Object record, OutputStream stream) throws Exception {
                   Serializer.serialize((NodeState) record, stream);
               }
           }, nodes.iterator());
       }
       if (!props.isEmpty()) {
           storeRecords(connection, propSQL, new RecordWriter() {
               public String getId(Object record) {
                   return ((PropertyState) record).getId().toString();
               }
               public void writeRecord(
                       Object record, OutputStream stream) throws Exception {
                   Serializer.serialize((PropertyState) record, stream, blobs);
               }
           }, props.iterator());
       }
   }

   private void storeRecords(
           Connection connection, String sql,
           RecordWriter writer, Iterator iterator) throws SQLException {
       PreparedStatement statement = connection.prepareStatement(sql);
       try {
           while (iterator.hasNext()) {
               Object record = iterator.next();
               ByteArrayOutputStream buffer = new ByteArrayOutputStream();
               try {
                   writer.writeRecord(record, buffer);
               } catch (Exception e) {
                   throw new SQLException("Serialization failed: " + record);
               }
               byte[] bytes = buffer.toByteArray();
               statement.setBinaryStream(
                       1, new ByteArrayInputStream(bytes), bytes.length);
               statement.setString(2, writer.getId(record));
               statement.execute();
           }
       } finally {
           statement.close();
       }
   }

   private void deleteItemStates(Connection connection, Iterator iterator)
           throws SQLException {
       Collection nodes = new ArrayList();
       Collection props = new ArrayList();
       classifyItemStates(iterator, nodes, props);
       if (!nodes.isEmpty()) {
           deleteItemStates(connection, nodeDeleteSQL, nodes.iterator());
       }
       if (!props.isEmpty()) {
           deleteItemStates(connection, propDeleteSQL, props.iterator());
       }
   }

   private void deleteItemStates(
           Connection connection, String sql, Iterator iterator)
           throws SQLException {
       PreparedStatement statement = connection.prepareStatement(sql);
       try {
           while (iterator.hasNext()) {
               ItemState state = (ItemState) iterator.next();
               statement.setString(1, state.getId().toString());
               statement.execute();
           }
       } finally {
           statement.close();
       }
   }

   private void storeNodeReferences(
           Connection connection, Iterator iterator) throws SQLException {
       if (iterator.hasNext()) {
           Collection insert = new ArrayList();
           Collection update = new ArrayList();
           Collection delete = new ArrayList();
           classifyNodeReferences(connection, iterator, insert,
update, delete);

           if (!insert.isEmpty()) {
               storeNodeReferences(
                       connection, refsInsertSQL, insert.iterator());
           }
           if (!update.isEmpty()) {
               storeNodeReferences(
                       connection, refsUpdateSQL, update.iterator());
           }
           if (!delete.isEmpty()) {
               deleteNodeReferences(connection, delete.iterator());
           }
       }
   }

   private void storeNodeReferences(
           Connection connection, String sql, Iterator iterator)
           throws SQLException {
       storeRecords(connection, sql, new RecordWriter() {
           public String getId(Object record) {
               return ((NodeReferences) record).getId().toString();
           }
           public void writeRecord(Object record, OutputStream stream)
                   throws Exception {
               Serializer.serialize((NodeReferences) record, stream);
           }
       }, iterator);
   }

   private void deleteNodeReferences(Connection connection, Iterator iterator)
           throws SQLException {
       PreparedStatement delete = connection.prepareStatement(refsDeleteSQL);
       try {
           while (iterator.hasNext()) {
               NodeReferences references = (NodeReferences) iterator.next();
               delete.setString(1, references.getId().toString());
               delete.execute();
           }
       } finally {
           delete.close();
       }
   }

   private void classifyNodeReferences(
           Connection connection, Iterator iterator,
           Collection insert, Collection update, Collection delete)
           throws SQLException {
       PreparedStatement select = connection.prepareStatement(refsExistsSQL);
       try {
           while (iterator.hasNext()) {
               NodeReferences references = (NodeReferences) iterator.next();
               if (!references.hasReferences()) {
                   delete.add(references);
               } else {
                   select.setString(1, references.getId().toString());
                   ResultSet rs = select.executeQuery();
                   try {
                       if (rs.next()) {
                           update.add(references);
                       } else {
                           insert.add(references);
                       }
                   } finally {
                       rs.close();
                   }
               }
           }
       } finally {
           select.close();
       }
   }

}

Reply via email to