http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
index d5ba90d..431e1eb 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java
@@ -27,11 +27,21 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim;
-import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim;
-import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim;
-import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 
+/**
+ * Kryo serializer for {@link StarGraph}.  Implements an internal versioning 
capability for backward compatibility.
+ * The single byte at the front of the serialization stream denotes the 
version.  That version can be used to choose
+ * the correct deserialization mechanism.  The limitation is that this 
versioning won't help with backward
+ * compatibility for custom serializers from providers.  Providers should be 
encouraged to write their serializers
+ * with backward compatibility in mind.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
 public class StarGraphSerializer implements SerializerShim<StarGraph> {
 
     private final Direction edgeDirectionToSerialize;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index 2053280..c19b914 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
-import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index e7a38a5..88f7ee1 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -21,11 +21,9 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import 
org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
-import org.apache.tinkerpop.shaded.kryo.io.Output;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
index 7ac8e8c..2252ded 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java
@@ -21,12 +21,11 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
-import 
org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
 
b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
new file mode 100644
index 0000000..0b27e72
--- /dev/null
+++ 
b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService
@@ -0,0 +1 @@
+org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # 
HadoopPools provides/caches instances of TinkerPop's shaded Kryo

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
 
b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
deleted file mode 100644
index 0b27e72..0000000
--- 
a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # 
HadoopPools provides/caches instances of TinkerPop's shaded Kryo

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
new file mode 100644
index 0000000..4c99e70
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+import org.apache.spark.serializer.KryoRegistrator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation;
+import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.ObjectWritableSerializer;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A spark.kryo.registrator implementation that installs TinkerPop types.
+ * This is intended for use with spark.serializer=KryoSerializer, not 
GryoSerializer.
+ */
+public class TinkerPopKryoRegistrator implements KryoRegistrator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TinkerPopKryoRegistrator.class);
+
+    @Override
+    public void registerClasses(Kryo kryo) {
+        // TinkerPop type registrations copied from GyroSerializer's 
constructor
+        kryo.register(MessagePayload.class);
+        kryo.register(ViewIncomingPayload.class);
+        kryo.register(ViewOutgoingPayload.class);
+        kryo.register(ViewPayload.class);
+        kryo.register(VertexWritable.class, new 
UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
+        kryo.register(ObjectWritable.class, new 
UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
+
+        Set<Class<?>> shimmedClasses = new HashSet<>();
+
+        Set<Class<?>> javaSerializationClasses = new HashSet<>();
+
+        // Copy GryoMapper's default registrations
+        for (TypeRegistration<?> tr : 
GryoMapper.build().create().getTypeRegistrations()) {
+            // Special case for JavaSerializer, which is generally implemented 
in terms of TinkerPop's
+            // problematic static GryoMapper/GryoSerializer pool (these are 
handled below the loop)
+            org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = 
tr.getShadedSerializer();
+            SerializerShim<?> serializerShim = tr.getSerializerShim();
+            if (null != shadedSerializer &&
+                    
shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class))
 {
+                javaSerializationClasses.add(tr.getTargetClass());
+            } else if (null != serializerShim) {
+                log.debug("Registering class {} to serializer shim {} 
(serializer shim class {})",
+                        tr.getTargetClass(), serializerShim, 
serializerShim.getClass());
+                kryo.register(tr.getTargetClass(), new 
UnshadedSerializerAdapter<>(serializerShim));
+                shimmedClasses.add(tr.getTargetClass());
+            } else {
+                // Register with the default behavior (FieldSerializer)
+                log.debug("Registering class {} with default serializer", 
tr.getTargetClass());
+                kryo.register(tr.getTargetClass());
+            }
+        }
+
+        Map<Class<?>, Serializer<?>> javaSerializerReplacements = new 
HashMap<>();
+        javaSerializerReplacements.put(GroupStep.GroupBiOperator.class, new 
JavaSerializer());
+        javaSerializerReplacements.put(OrderGlobalStep.OrderBiOperator.class, 
null);
+        javaSerializerReplacements.put(TraversalExplanation.class, null);
+
+        for (Map.Entry<Class<?>, Serializer<?>> e : 
javaSerializerReplacements.entrySet()) {
+            Class<?> c = e.getKey();
+            Serializer<?> s = e.getValue();
+
+            if (javaSerializationClasses.remove(c)) {
+                if (null != s) {
+                    log.debug("Registering class {} with serializer {}", c, s);
+                    kryo.register(c, s);
+                } else {
+                    log.debug("Registering class {} with default serializer", 
c);
+                    kryo.register(c);
+                }
+            } else {
+                log.debug("Registering class {} with JavaSerializer", c);
+                kryo.register(c, new JavaSerializer());
+            }
+        }
+
+        // We really care about StarGraph's shim serializer, so make sure we 
registered it
+        if (!shimmedClasses.contains(StarGraph.class)) {
+            log.warn("No SerializerShim found for StarGraph");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
index 21cbc60..4ceb045 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java
@@ -20,6 +20,10 @@
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
 
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.Serializer;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
@@ -28,16 +32,16 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ObjectWritableSerializer<T> extends 
Serializer<ObjectWritable<T>> {
+public final class ObjectWritableSerializer<T> implements 
SerializerShim<ObjectWritable<T>> {
 
     @Override
-    public void write(final Kryo kryo, final Output output, final 
ObjectWritable<T> objectWritable) {
-        kryo.writeClassAndObject(output, objectWritable.get());
+    public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, 
ObjectWritable<T> starGraph) {
+        kryo.writeClassAndObject(output, starGraph.get());
         output.flush();
     }
 
     @Override
-    public ObjectWritable<T> read(final Kryo kryo, final Input input, final 
Class<ObjectWritable<T>> clazz) {
+    public <I extends InputShim> ObjectWritable<T> read(KryoShim<I, ?> kryo, I 
input, Class<ObjectWritable<T>> clazz) {
         return new ObjectWritable(kryo.readClassAndObject(input));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
index 97891f3..f3c1b15 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java
@@ -20,6 +20,10 @@
 package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
 
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
 import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
 import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.Serializer;
@@ -29,14 +33,16 @@ import org.apache.tinkerpop.shaded.kryo.io.Output;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class VertexWritableSerializer extends Serializer<VertexWritable> 
{
+public final class VertexWritableSerializer implements 
SerializerShim<VertexWritable> {
+
     @Override
-    public void write(final Kryo kryo, final Output output, final 
VertexWritable vertexWritable) {
+    public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, 
VertexWritable vertexWritable) {
         kryo.writeObject(output, vertexWritable.get().graph());
+        output.flush();
     }
 
     @Override
-    public VertexWritable read(final Kryo kryo, final Input input, final 
Class<VertexWritable> aClass) {
+    public <I extends InputShim> VertexWritable read(KryoShim<I, ?> kryo, I 
input, Class<VertexWritable> clazz) {
         return new VertexWritable(kryo.readObject(input, 
StarGraph.class).getStarVertex());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java
new file mode 100644
index 0000000..c533af7
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ *
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
+
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim;
+
+public class UnshadedInputAdapter implements InputShim
+{
+
+    private final Input unshadedInput;
+
+    public UnshadedInputAdapter(Input unshadedInput)
+    {
+        this.unshadedInput = unshadedInput;
+    }
+
+    Input getUnshadedInput()
+    {
+        return unshadedInput;
+    }
+
+    @Override
+    public byte readByte()
+    {
+        return unshadedInput.readByte();
+    }
+
+    @Override
+    public byte[] readBytes(int size) {
+        return unshadedInput.readBytes(size);
+    }
+
+    @Override
+    public String readString()
+    {
+        return unshadedInput.readString();
+    }
+
+    @Override
+    public long readLong()
+    {
+        return unshadedInput.readLong();
+    }
+
+    @Override
+    public int readInt() {
+        return unshadedInput.readInt();
+    }
+
+    @Override
+    public double readDouble()
+    {
+        return unshadedInput.readDouble();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java
new file mode 100644
index 0000000..b14abe0
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ *
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim;
+
+public class UnshadedKryoAdapter implements KryoShim<UnshadedInputAdapter, 
UnshadedOutputAdapter>
+{
+    private final Kryo unshadedKryo;
+
+    public UnshadedKryoAdapter(Kryo unshadedKryo)
+    {
+        this.unshadedKryo = unshadedKryo;
+    }
+
+    @Override
+    public <T> T readObject(UnshadedInputAdapter input, Class<T> type)
+    {
+        return unshadedKryo.readObject(input.getUnshadedInput(), type);
+    }
+
+    @Override
+    public Object readClassAndObject(UnshadedInputAdapter input)
+    {
+        return unshadedKryo.readClassAndObject(input.getUnshadedInput());
+    }
+
+    @Override
+    public void writeObject(UnshadedOutputAdapter output, Object object)
+    {
+        unshadedKryo.writeObject(output.getUnshadedOutput(), object);
+    }
+
+    @Override
+    public void writeClassAndObject(UnshadedOutputAdapter output, Object 
object)
+    {
+        unshadedKryo.writeClassAndObject(output.getUnshadedOutput(), object);
+    }
+
+    @Override
+    public <T> T readObjectOrNull(UnshadedInputAdapter input, Class<T> type)
+    {
+        return unshadedKryo.readObjectOrNull(input.getUnshadedInput(), type);
+    }
+
+    @Override
+    public void writeObjectOrNull(UnshadedOutputAdapter output, Object object, 
Class type)
+    {
+        unshadedKryo.writeObjectOrNull(output.getUnshadedOutput(), object, 
type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
new file mode 100644
index 0000000..d0411e8
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ * <p>
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
+
+import com.twitter.chill.KryoInstantiator;
+import com.twitter.chill.KryoPool;
+import com.twitter.chill.SerDeState;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.TinkerPopKryoRegistrator;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class UnshadedKryoShimService implements KryoShimService {
+
+    public static final String SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY = 
"tinkerpop.kryo.poolsize";
+
+    private static final Logger log = 
LoggerFactory.getLogger(UnshadedKryoShimService.class);
+    private static final int SPARK_KRYO_POOL_SIZE_DEFAULT = 8;
+
+    private final KryoSerializer sparkKryoSerializer;
+    private final KryoPool kryoPool;
+
+    public UnshadedKryoShimService() {
+        this(TinkerPopKryoRegistrator.class.getCanonicalName(), 
getDefaultKryoPoolSize());
+    }
+
+    public UnshadedKryoShimService(String sparkKryoRegistratorClassname, int 
kryoPoolSize) {
+        SparkConf sparkConf = new SparkConf();
+        sparkConf.set("spark.serializer", 
KryoSerializer.class.getCanonicalName());
+        sparkConf.set("spark.kryo.registrator", sparkKryoRegistratorClassname);
+        sparkKryoSerializer = new KryoSerializer(sparkConf);
+        kryoPool = KryoPool.withByteArrayOutputStream(kryoPoolSize, new 
KryoInstantiator());
+    }
+
+    @Override
+    public Object readClassAndObject(InputStream source) {
+        SerDeState sds = null;
+        try {
+            sds = kryoPool.borrow();
+
+            sds.setInput(source);
+
+            return sds.readClassAndObject();
+        } finally {
+            kryoPool.release(sds);
+        }
+    }
+
+    @Override
+    public void writeClassAndObject(Object o, OutputStream sink) {
+        SerDeState sds = null;
+        try {
+            sds = kryoPool.borrow();
+
+            sds.writeClassAndObject(o); // this writes to an internal buffer
+
+            sds.writeOutputTo(sink); // this copies the internal buffer to sink
+
+            sink.flush();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            kryoPool.release(sds);
+        }
+    }
+
+    @Override
+    public int getPriority() {
+        return 1024;
+    }
+
+    private static int getDefaultKryoPoolSize() {
+        String raw = System.getProperty(SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
+
+        int size = SPARK_KRYO_POOL_SIZE_DEFAULT;
+        try {
+            size = Integer.valueOf(raw);
+            log.info("Setting kryo pool size to {} according to system 
property {}", size,
+                    SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY);
+        } catch (NumberFormatException e) {
+            log.error("System property {}={} could not be parsed as an 
integer, using default value {}",
+                    SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY, raw, size, e);
+        }
+
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java
new file mode 100644
index 0000000..9cc59d4
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ *
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
+
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim;
+
+public class UnshadedOutputAdapter implements OutputShim
+{
+    private final Output unshadedOutput;
+
+    public UnshadedOutputAdapter(Output unshadedOutput)
+    {
+        this.unshadedOutput = unshadedOutput;
+    }
+
+    Output getUnshadedOutput()
+    {
+        return unshadedOutput;
+    }
+
+    @Override
+    public void writeByte(byte b)
+    {
+        unshadedOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] array, int offset, int count) {
+        unshadedOutput.writeBytes(array, offset, count);
+    }
+
+    @Override
+    public void writeString(String s)
+    {
+        unshadedOutput.writeString(s);
+    }
+
+    @Override
+    public void writeLong(long l)
+    {
+        unshadedOutput.writeLong(l);
+    }
+
+    @Override
+    public void writeInt(int i) {
+        unshadedOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeDouble(double d)
+    {
+        unshadedOutput.writeDouble(d);
+    }
+
+    @Override
+    public void flush()
+    {
+        unshadedOutput.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
new file mode 100644
index 0000000..efc9a4f
--- /dev/null
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copyright DataStax, Inc.
+ *
+ * Please see the included license file for details.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
+
+public class UnshadedSerializerAdapter<T> extends Serializer<T>
+{
+
+    SerializerShim<T> serializer;
+
+    public UnshadedSerializerAdapter(SerializerShim<T> serializer) {
+        this.serializer = serializer;
+        setImmutable(this.serializer.isImmutable());
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, T t) {
+        UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo);
+        UnshadedOutputAdapter shadedOutputAdapter = new 
UnshadedOutputAdapter(output);
+        serializer.write(shadedKryoAdapter, shadedOutputAdapter, t);
+    }
+
+    @Override
+    public T read(Kryo kryo, Input input, Class<T> aClass)
+    {
+        UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo);
+        UnshadedInputAdapter shadedInputAdapter = new 
UnshadedInputAdapter(input);
+        return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass);
+    }
+}

Reply via email to