Able to now test both shim and non-shim models in Spark. Also go configuration 
with ProgramTest working.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e7003635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e7003635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e7003635

Branch: refs/heads/TINKERPOP-1331
Commit: e7003635e27c625b3f30492111f20f4fe4e24eb5
Parents: 0cd31bf
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Mon Jun 6 14:52:53 2016 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Mon Jun 6 14:52:53 2016 -0600

----------------------------------------------------------------------
 .../structure/io/gryo/GryoSerializers.java      |  8 +-
 .../structure/io/gryo/GryoRegistrator.java      | 90 ++++++++++++++++----
 .../computer/SparkHadoopGraphProvider.java      | 11 ++-
 3 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
index 16fbe85..2042a4a 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java
@@ -47,7 +47,7 @@ public final class GryoSerializers {
     /**
      * Serializes any {@link Edge} implementation encountered to a {@link 
DetachedEdge}.
      */
-    final static class EdgeSerializer implements SerializerShim<Edge> {
+    public final static class EdgeSerializer implements SerializerShim<Edge> {
         @Override
         public <O extends OutputShim> void write(KryoShim<?, O> kryo, O 
output, Edge edge) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(edge, 
true));
@@ -63,7 +63,7 @@ public final class GryoSerializers {
     /**
      * Serializes any {@link Vertex} implementation encountered to an {@link 
DetachedVertex}.
      */
-    final static class VertexSerializer implements SerializerShim<Vertex> {
+    public final static class VertexSerializer implements 
SerializerShim<Vertex> {
         @Override
         public <O extends OutputShim> void write(KryoShim<?, O> kryo, O 
output, Vertex vertex) {
             kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, 
true));
@@ -78,7 +78,7 @@ public final class GryoSerializers {
     /**
      * Serializes any {@link Property} implementation encountered to an {@link 
DetachedProperty}.
      */
-    final static class PropertySerializer implements SerializerShim<Property> {
+    public final static class PropertySerializer implements 
SerializerShim<Property> {
         @Override
         public <O extends OutputShim> void write(KryoShim<?, O> kryo, O 
output, Property property) {
             kryo.writeClassAndObject(output, property instanceof 
VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : 
DetachedFactory.detach(property));
@@ -93,7 +93,7 @@ public final class GryoSerializers {
     /**
      * Serializes any {@link VertexProperty} implementation encountered to an 
{@link DetachedVertexProperty}.
      */
-    final static class VertexPropertySerializer implements 
SerializerShim<VertexProperty> {
+    public final static class VertexPropertySerializer implements 
SerializerShim<VertexProperty> {
         @Override
         public <O extends OutputShim> void write(KryoShim<?, O> kryo, O 
output, VertexProperty vertexProperty) {
             kryo.writeClassAndObject(output, 
DetachedFactory.detach(vertexProperty, true));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
index 1ae8c5c..9563408 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java
@@ -22,19 +22,44 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.spark.util.SerializableConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.MutablePath;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_S_SE_SL_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_SL_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
 import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoSerializers;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.mutable.WrappedArray;
 
 import java.util.*;
 
@@ -55,11 +80,11 @@ public class GryoRegistrator implements KryoRegistrator {
      * Register TinkerPop's classes with the supplied {@link Kryo} instance
      * while honoring optional overrides and optional class blacklist 
("blackset"?).
      *
-     * @param kryo the Kryo serializer instance with which to register types
+     * @param kryo                the Kryo serializer instance with which to 
register types
      * @param serializerOverrides serializer mappings that override this 
class's defaults
-     * @param blacklist classes which should not be registered at all, even if 
there is an override entry
-     *                  or if they would be registered by this class by 
default (does not affect Kryo's
-     *                  built-in registrations, e.g. String.class).
+     * @param blacklist           classes which should not be registered at 
all, even if there is an override entry
+     *                            or if they would be registered by this class 
by default (does not affect Kryo's
+     *                            built-in registrations, e.g. String.class).
      */
     public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> 
serializerOverrides, Set<Class<?>> blacklist) {
         // Apply TinkerPop type registrations copied from GyroSerializer's 
constructor
@@ -111,11 +136,11 @@ public class GryoRegistrator implements KryoRegistrator {
                 } else {
                     // There's supposed to be a check in GryoMapper that 
prevents this from happening
                     log.error("GryoMapper's default serialization registration 
for {} is a {}. " +
-                              "This is probably a bug in TinkerPop (this is 
not a valid default registration). " +
-                              "I am configuring Spark to use Kryo's default 
serializer for this class, " +
-                              "but this may cause serialization failures at 
runtime.",
-                              tr.getTargetClass(),
-                              
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+                                    "This is probably a bug in TinkerPop (this 
is not a valid default registration). " +
+                                    "I am configuring Spark to use Kryo's 
default serializer for this class, " +
+                                    "but this may cause serialization failures 
at runtime.",
+                            tr.getTargetClass(),
+                            
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
                     kryo.register(tr.getTargetClass());
                 }
             } else if (null != serializerShim) {
@@ -127,12 +152,12 @@ public class GryoRegistrator implements KryoRegistrator {
             } else if (null != functionOfShadedKryo) {
                 // As with shaded serializers, there's supposed to be a check 
in GryoMapper that prevents this from happening
                 log.error("GryoMapper's default serialization registration for 
{} is a Function<{},{}>.  " +
-                          "This is probably a bug in TinkerPop (this is not a 
valid default registration). " +
-                          "I am configuring Spark to use Kryo's default 
serializer instead of this function, " +
-                          "but this may cause serialization failures at 
runtime.",
-                          tr.getTargetClass(),
-                          
org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
-                          
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
+                                "This is probably a bug in TinkerPop (this is 
not a valid default registration). " +
+                                "I am configuring Spark to use Kryo's default 
serializer instead of this function, " +
+                                "but this may cause serialization failures at 
runtime.",
+                        tr.getTargetClass(),
+                        
org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
+                        
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
                 kryo.register(tr.getTargetClass());
             } else {
                 // Register all other classes with the default behavior 
(FieldSerializer)
@@ -164,13 +189,46 @@ public class GryoRegistrator implements KryoRegistrator {
         // duplication, but it would be a bit cumbersome to do so without 
disturbing
         // the ordering of the existing entries in that constructor, since not 
all
         // of the entries are for TinkerPop (and the ordering is significant).
+        if (Boolean.valueOf(System.getProperty("is.testing", "false"))) {
+            try {
+                m.put(Class.forName("scala.reflect.ClassTag$$anon$1"), new 
JavaSerializer());
+                m.put(Class.forName("scala.reflect.ManifestFactory$$anon$1"), 
new JavaSerializer());
+            } catch (final ClassNotFoundException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+        m.put(WrappedArray.ofRef.class, null);
         m.put(MessagePayload.class, null);
         m.put(ViewIncomingPayload.class, null);
         m.put(ViewOutgoingPayload.class, null);
         m.put(ViewPayload.class, null);
         m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new 
VertexWritableSerializer()));
         m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new 
ObjectWritableSerializer<>()));
-
+        //
+        m.put(HadoopConfiguration.class, null);
+        //
+        m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.VertexSerializer()));
+        m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.VertexPropertySerializer()));
+        m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.PropertySerializer()));
+        m.put(HadoopEdge.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.EdgeSerializer()));
+        //
+        m.put(ComputerGraph.ComputerVertex.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer()));
+        m.put(ComputerGraph.ComputerVertexProperty.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer()));
+        m.put(ComputerGraph.ComputerProperty.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));
+        m.put(ComputerGraph.ComputerEdge.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer()));
+        //
+        m.put(StarGraph.StarEdge.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.EdgeSerializer()));
+        m.put(StarGraph.StarVertex.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.VertexSerializer()));
+        m.put(StarGraph.StarProperty.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));
+        m.put(StarGraph.StarVertexProperty.class, new 
UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer()));
+        //
+        m.put(MutablePath.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.PathSerializer()));
+        m.put(ImmutablePath.class, new UnshadedSerializerAdapter<>(new 
GryoSerializers.PathSerializer()));
+        try {
+            m.put(Class.forName(ImmutablePath.class.getCanonicalName() + 
"$TailPath"), new UnshadedSerializerAdapter<>(new 
GryoSerializers.PathSerializer()));
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
         return m;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 85552ce..7737d1e 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import 
org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -40,10 +41,14 @@ import 
org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
+import static 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader.SHIM_CLASS_SYSTEM_PROPERTY;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -83,9 +88,13 @@ public final class SparkHadoopGraphProvider extends 
HadoopGraphProvider {
 
 
         config.put("spark.master", "local[4]");
-        if (false) {
+        if (RANDOM.nextBoolean()) {
+            System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, 
HadoopPoolShimService.class.getCanonicalName());
+            KryoShimServiceLoader.load(true);
             config.put("spark.serializer", 
GryoSerializer.class.getCanonicalName());
         } else {
+            System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, 
UnshadedKryoShimService.class.getCanonicalName());
+            KryoShimServiceLoader.load(true);
             config.put("spark.serializer", 
KryoSerializer.class.getCanonicalName());
             config.put("spark.kryo.registrator", 
GryoRegistrator.class.getCanonicalName());
         }

Reply via email to