Author: peter_firmstone Date: Fri Mar 1 12:25:32 2013 New Revision: 1451573
URL: http://svn.apache.org/r1451573 Log: Distributed serialization updates Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalInputStream.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalOutputStream.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributePermission.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/Distributed.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectInputStream.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectOutputStream.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialFactory.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalInputStream.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalInputStream.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalInputStream.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalInputStream.java Fri Mar 1 12:25:32 2013 @@ -25,11 +25,14 @@ import java.io.ObjectInputStream; import java.io.ObjectStreamClass; import java.net.MalformedURLException; import java.rmi.server.RMIClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Collection; import java.util.HashMap; import java.util.Map; import net.jini.loader.ClassLoading; import net.jini.security.Security; +import org.apache.river.api.io.DistributedObjectInputStream; /** * An extension of <code>ObjectInputStream</code> that implements the @@ -83,7 +86,7 @@ import net.jini.security.Security; * @since 2.0 **/ public class MarshalInputStream - extends ObjectInputStream + extends DistributedObjectInputStream implements ObjectStreamContext { /** @@ -199,6 +202,16 @@ public class MarshalInputStream this.verifyCodebaseIntegrity = verifyCodebaseIntegrity; this.verifierLoader = verifierLoader; this.context = context; + AccessController.doPrivileged(new PrivilegedAction<Object>(){ + + @Override + public Object run() { + enableResolveObject(true); + return null; + } + + }); + } /** Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalOutputStream.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalOutputStream.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalOutputStream.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/io/MarshalOutputStream.java Fri Mar 1 12:25:32 2013 @@ -22,7 +22,10 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.rmi.server.RMIClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Collection; +import org.apache.river.api.io.DistributedObjectOutputStream; /** * An extension of <code>ObjectOutputStream</code> that implements the @@ -54,7 +57,7 @@ import java.util.Collection; * @since 2.0 **/ public class MarshalOutputStream - extends ObjectOutputStream + extends DistributedObjectOutputStream implements ObjectStreamContext { /** context for ObjectStreamContext implementation */ @@ -95,6 +98,16 @@ public class MarshalOutputStream throw new NullPointerException(); } this.context = context; + + AccessController.doPrivileged(new PrivilegedAction<Object>(){ + + @Override + public Object run() { + enableReplaceObject(true); + return null; + } + + }); } /** Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributePermission.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributePermission.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributePermission.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributePermission.java Fri Mar 1 12:25:32 2013 @@ -16,11 +16,16 @@ */ package org.apache.river.api.io; +import java.io.ObjectOutput; import java.security.BasicPermission; /** - * + * This Permission allows an object to be Distributed by an implementation of + * ObjectOutput + * * @author peter + * @see SerialFactory + * @see ObjectOutput */ public class DistributePermission extends BasicPermission{ private static final long serialVersionUID = 1L; Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/Distributed.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/Distributed.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/Distributed.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/Distributed.java Fri Mar 1 12:25:32 2013 @@ -16,8 +16,6 @@ */ package org.apache.river.api.io; -import java.io.Serializable; - /** * Distributed objects are immutable value objects with final fields that may * be freely replicated. @@ -30,15 +28,12 @@ import java.io.Serializable; * purposes. * <p> * Distributed objects have no version, instead SerialFactory contains all - * information required to distribute and recreate any Distributed Object. - * <p> - * Distributed object all have a common serial form, defined by SerialFactory. - * <p> - * Distributed objects are value objects in domain driven design. + * information required to distribute and recreate any Distributed Object using + * reflection. * <p> - * Remote objects are entity or service objects in domain driven design context. + * Distributed objects are value objects from a domain driven design perspective. * <p> - * Although final is not enforced, all fields must be final and safe + * Although final is not enforced, all fields must be final, safe * construction must be honored, distributed objects will be exposed to multiple * threads on multiple nodes, without synchronization or transactions. * <p> @@ -48,5 +43,15 @@ import java.io.Serializable; * @author Peter Firmstone. */ public interface Distributed { + + /** + * Substitutes an Object in an ObjectOutput with a SerialFactory, + * ObjectInput uses SerialFactory to reconstruct the Object at the + * remote end using reflection to call a constructor, static method or + * object method. + * + * @return SerialFactory for object instantiation using reflection to call + * a constructor, static method or object method. + */ SerialFactory substitute(); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectInputStream.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectInputStream.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectInputStream.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectInputStream.java Fri Mar 1 12:25:32 2013 @@ -43,11 +43,6 @@ public class DistributedObjectInputStrea */ protected DistributedObjectInputStream(InputStream in) throws IOException{ super(in); - try { - super.enableResolveObject(true); - } catch (SecurityException e){ - // Ignore, will be called from privileged context if create method used. - } } private void enableResolveObject(){ Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectOutputStream.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectOutputStream.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectOutputStream.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/DistributedObjectOutputStream.java Fri Mar 1 12:25:32 2013 @@ -36,7 +36,6 @@ public class DistributedObjectOutputStre protected DistributedObjectOutputStream (OutputStream out) throws IOException{ super(out); - super.enableReplaceObject(true); } protected final Object replaceObject(Object o){ Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialFactory.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialFactory.java?rev=1451573&r1=1451572&r2=1451573&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialFactory.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialFactory.java Fri Mar 1 12:25:32 2013 @@ -20,25 +20,19 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.StreamCorruptedException; import java.lang.reflect.Constructor; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.security.AccessController; -import java.security.BasicPermission; import java.security.Guard; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.logging.Level; -import java.util.logging.Logger; /** - * Distributed form, required for Object reconstruction, using constructor, - * static factory method instantiation or Object builder instantiation. + * Distributed form, required for reflective calls to instantiate remotely, + * using constructor, static method or Object method. * * This object must be Thread confined, it is not thread safe. * - * Internal state is guarded. + * Internal state is guarded, arrays are not defensively coped. * * @author Peter Firmstone. * @see Distributed @@ -49,25 +43,77 @@ public final class SerialFactory impleme /* Guard private state */ private static final Guard distributable = new DistributePermission(); + /* Minimal protocol to write primitives directly to stream. + * Strings are written as Objects, since they are a special case, identical + * strings are sent by reference not duplicates writes to stream. + * Class is also a special case, it also is not duplicated in streams. + * + * Only primitives are written separately, the following indicates the + * next parameter type in the stream. + */ + private static final byte BOOLEAN = 0; + private static final byte BYTE = 1; + private static final byte CHAR = 2; + private static final byte SHORT = 3; + private static final byte INT = 4; + private static final byte LONG = 5; + private static final byte FLOAT = 6; + private static final byte DOUBLE = 7; + private static final byte OBJECT = 8; + private static final byte NULL = 9; + + private Object classOrObject; private String method; private Class [] parameterTypes; private Object [] parameters; private final boolean constructed; // default value is false. + /** + * Public method provided for serialization framework. + */ public SerialFactory(){ constructed = false; } /** - * - * + * Reflection is used at the remote end, with information provided to + * SerialFactory, to call a constructor, static method + * or object method after de-serialization by DistributedObjectInputStream. + * <p> + * Information given to SerialFactory is guarded by DistributePermission. + * <p> + * Instantiation of Distributed object at remote ends proceeds as follows: + * <ul><li> + * If factoryClassOrObject is a Class and methodName is null, a constructor + * of that Class is called reflectively with parameterTypes and parameters. + * </li><li> + * If factoryClassOrObject is a Class and methodName is defined, a static + * method with the defined name is called reflectively on that Class with + * parameterTypes and parameters. + * </li><li> + * If factoryClassOrObject is an Object and methodName is defined, a method + * with the defined name is called reflectively on that Object with + * parameterTypes and parameters. + * </li></ul> + * <p> + * Tip: Object versions of primitive values and String parameters + * are relatively fast as are primitive array parameters. + * Object versions of primitive parameters are externalized + * as primitives, arrays of Object's are treated as object parameters. + * <p> + * If you really need ultimate serialization performance, consider using a + * constructor that accepts a single parameter byte[] array. + * Remember to compress your bytes, this will minimize the size of the + * byte stream. + * <p> * * @param factoryClassOrObject will be used for constructor, factory static method, * or builder Object. * @param methodName name of static factory method, null if using a constructor. - * @param parameterTypes Type signature of method or constructor - * @param parameters Object to be passed to constructor. + * @param parameterTypes Type signature of method or constructor, or null. + * @param parameters array of Objects to be passed to constructor, or null. + * @see DistributePermission */ public SerialFactory(Object factoryClassOrObject, String methodName, Class[] parameterTypes, Object [] parameters){ classOrObject = factoryClassOrObject; @@ -75,6 +121,10 @@ public final class SerialFactory impleme this.parameterTypes = parameterTypes; this.parameters = parameters; constructed = true; + if ( (parameterTypes != null && parameterTypes.length != parameters.length) || (parameters != null && parameters.length > 127)) + throw new IllegalArgumentException("Array lengths don't match, or arrays are too long," + + " parameter array limit 127, " + + "you need to see a shrink if you need this many parameters"); } Object create() throws IOException { @@ -112,7 +162,6 @@ public final class SerialFactory impleme } } - @Override public void writeExternal(ObjectOutput out) throws IOException { distributable.checkGuard(null); out.writeObject(classOrObject); @@ -121,97 +170,110 @@ public final class SerialFactory impleme * object to do so if needs to. */ out.writeObject(parameterTypes); - out.writeObject(parameters); + int l = parameters != null ? parameters.length : 0; + // Write length to stream. + out.writeByte(l); + for (int i = 0; i < l; i++){ + writeObject(parameters[i], out); + } } - + + /** + * Object primitive values parameters are sent as their values to avoid + * Serialization overhead, this is only performed because primitives aren't + * Objects so can't be used directly. Primitive arrays are Objects so + * they can be used, therefore there's no need to handle them here. + */ + private void writeObject(Object o, ObjectOutput out ) throws IOException{ + if (o == null) { + out.writeByte(NULL); + return; + } + if (o instanceof Boolean){ + out.writeByte(BOOLEAN); + out.writeBoolean(((Boolean) o).booleanValue()); + return; + } + if (o instanceof Byte){ + out.writeByte(BYTE); + out.writeByte(((Byte)o).byteValue()); + return; + } + if (o instanceof Character){ + out.writeByte(CHAR); + out.writeChar(((Character)o).charValue()); + } + if (o instanceof Short){ + out.writeByte(SHORT); + out.writeShort(((Short)o).shortValue()); + return; + } + if (o instanceof Integer){ + out.writeByte(INT); + out.writeInt(((Integer)o).intValue()); + return; + } + if (o instanceof Long){ + out.writeByte(LONG); + out.writeLong(((Long)o).longValue()); + return; + } + if (o instanceof Float){ + out.writeByte(FLOAT); + out.writeFloat(((Float)o).floatValue()); + return; + } + if (o instanceof Double){ + out.writeByte(DOUBLE); + out.writeDouble(((Double)o).doubleValue()); + return; + } + // Arrays are treated as Objects, java serialization is relatively + // efficient with primitive arrays. + out.writeByte(OBJECT); + out.writeObject(o); + } + + private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException{ + byte b = in.readByte(); + switch(b){ + case 0: boolean bool = in.readBoolean(); + return Boolean.valueOf(bool); + case 1: byte bite = in.readByte(); + return Byte.valueOf(bite); + case 2: char ch = in.readChar(); + return Character.valueOf(ch); + case 3: short sh = in.readShort(); + return Short.valueOf(sh); + case 4: int i = in.readInt(); + return Integer.valueOf(i); + case 5: long l = in.readLong(); + return Long.valueOf(l); + case 6: float f = in.readFloat(); + return Float.valueOf(f); + case 7: double d = in.readDouble(); + return Double.valueOf(d); + case 8: return in.readObject(); + case 9: return null; + default: throw new StreamCorruptedException("out of range byte read from stream"); + } + + } + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { if (constructed) throw new IllegalStateException("Object already constructed"); /* Don't defensively copy arrays, the object is used immediately after * deserialization to construct the Distributed Object, the fields are - * not accessed again. - * - * DistributedObjectOutputStream. + * not accessed again, it is up to creator methods to preserve invariants. */ classOrObject = in.readObject(); method = (String) in.readObject(); parameterTypes = (Class[]) in.readObject(); - parameters = (Object[]) in.readObject(); - - // All this hurts performance for little benefit. - // Read in before changing accessibility of fields. -// Object clas = in.readObject(); -// Object methName = in.readObject(); -// Object paramTypes = in.readObject(); -// Object param = in.readObject(); -// final String [] fieldNames = {"clazz", "method", "parameterTypes", "parameters"}; -// Field [] fields = null; -// try { -// fields = AccessController.doPrivileged(new Action(fieldNames)); -// } catch (PrivilegedActionException ex) { -// Exception e = ex.getException(); -// if (e instanceof NoSuchFieldException) throw new ClassNotFoundException("No such field", e); -// if (e instanceof SecurityException ) throw (SecurityException)e; -// throw new IOException("Unable to instantiate fields", e); -// } -// // Don't worry about defensive copy arrays, the constructor or factory -// // method will be called soon. -// try { -// if (clas instanceof Class) fields[0].set(this, clas); -// if (methName instanceof String) fields[1].set(this, methName); -// if (paramTypes instanceof Class[]) fields[2].set(this,paramTypes); -// if (param instanceof Object[]) fields[3].set(this, param); -// } catch (IllegalArgumentException ex) { -// Logger.getLogger(SerialFactory.class.getName()).log(Level.SEVERE, null, ex); -// } catch (IllegalAccessException ex) { -// Logger.getLogger(SerialFactory.class.getName()).log(Level.SEVERE, null, ex); -// } -// try { -// AccessController.doPrivileged(new RestoreProtection(fields)); -// } catch (PrivilegedActionException ex) { -// Exception e = ex.getException(); -// if (e instanceof SecurityException ) throw (SecurityException)e; -// throw new IOException("Unable to restore access control on final fields", e); -// } - } - -// private class Action implements PrivilegedExceptionAction<Field[]>{ -// private final String [] names; -// -// Action(String [] names){ -// this.names = names; -// } -// -// @Override -// public Field[] run() throws Exception { -// int l = names.length; -// Field [] result = new Field[l]; -// for (int i = 0; i < l; i++){ -// result [i] = SerialFactory.class.getDeclaredField(names[i]); -// result [i].setAccessible(true); -// } -// return result; -// } -// -// } -// -// private class RestoreProtection implements PrivilegedExceptionAction<Boolean>{ -// private final Field [] fields; -// -// RestoreProtection(Field [] f){ -// fields = f; -// } -// @Override -// public Boolean run() throws Exception { -// int l = fields.length; -// for (int i = 0; i < l; i++){ -// fields[i].setAccessible(false); -// } -// return Boolean.TRUE; -// } -// -// } - - - + byte len = in.readByte(); + parameters = len == 0 ? null : new Object[len]; + for (int i = 0; i < len; i++){ + parameters[i] = readObject(in); + } + } }
