[
https://issues.apache.org/jira/browse/TINKERPOP-1341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355382#comment-15355382
]
Dan LaRocque commented on TINKERPOP-1341:
-----------------------------------------
In my preceding comments, I tried to figure out which classes I had missed by
reading the relevant TinkerPop, Spark, and Chill source. This comment is about
approaching the problem from the opposite direction: instantiate a pair of new
and old serializers, then try to programmatically determine which classes are
registered with the old but not with the new.
This is not completely straightforward. As far as I know, I can't invoke some
handy public Kryo method that returns a {{Set<Class>}} of registrations and
compare old vs new. Kryo instances abstract the idea of registered-ness behind
the ClassResolver interface. {{ClassResolver.getRegistration(Class)}} is the
method most of Kryo calls to figure out whether a class is registered. This is
sort of a registered-ness predicate in that it returns a {{Registration}}
instance for those classes which are registered and null for those which are
not.
TinkerPop's old serialization code has a custom ClassResolver impl called
{{GryoClassResolver}}. Its internals look quite a bit like those of Kryo's
{{DefaultClassResolver}}, down to field names and its memoization technique.
At first glance, one might think we could just compare the contents of the map
named {{classToRegistration}} in {{GryoClassResolver}} (under the old system)
vs the same-named map in {{DefaultClassResolver}} (under the new system).
This is a useful comparision, but an imperfect one. It does not account for
the five case statements in {{GryoClassResolver.getRegistration}} that consider
effectively all subtypes of Vertex, Edge, Property VertexProperty, and Path to
be registered, and which work in concert with custom serializers that coerce
instances of such types into detached equivalents. IOW, there's some logic in
the old serializer code's {{GryoClassResolver.getRegistration}} method that
bypasses {{classToRegistration}}. I think all affected classes are registered
by the new serialization code's {{GryoRegistrator.getExtraRegistrations}},
which explicitly registers each of the relevant subtypes that TP needs (e.g.
HadoopVertex, ComputerEdge, StarProperty, ...), obviating the custom resolver.
Although that set of registrations was sufficient for the test suite, it is
theoretically possible that {{GryoRegistrator.getExtraRegistrations}} omits
some subtype of Vertex/Prop/etc. Such an omission would not be apparent in a
comparison of {{classToRegistration}} maps.
Despite that caveat, I still think it's worth comparing {{classToRegistration}}
on new vs old. Here's that comparison.
{noformat}
// Get set of classes registered under old system
gremlin> sc = new org.apache.spark.SparkConf(true)
==>org.apache.spark.SparkConf@285dc41f
gremlin> gs = new
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer(sc)
==>org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer@4f99a5d8
gremlin> oldClassResolverKeys =
gs.getGryoPool().takeKryo().getClassResolver().classToRegistration.keys().toSet()
; []
// Get set of classes registered under new system
gremlin> sc.set('spark.kryo.registrator',
'org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator')
==>org.apache.spark.SparkConf@285dc41f
gremlin> sc.set('spark.serializer',
'org.apache.spark.serializer.KryoSerializer')
==>org.apache.spark.SparkConf@285dc41f
gremlin> ks = new org.apache.spark.serializer.KryoSerializer(sc)
==>org.apache.spark.serializer.KryoSerializer@12670955
gremlin> newClassResolverKeys =
ks.newKryo().getClassResolver().classToRegistration.keys().toSet() ; []
// Show classes registered in the old system but not in the new one
gremlin> oldClassResolverKeys -
oldClassResolverKeys.intersect(newClassResolverKeys)
==>void
==>class scala.reflect.ClassTag$$anon$1
==>class java.lang.Void
==>class [Lorg.apache.spark.util.collection.CompactBuffer;
==>class scala.reflect.ManifestFactory$$anon$1
{noformat}
ManifestFactory$$anon$1 and ClassTag$$anon$1 are pathological cases that only
apparently come up in testing. Setting the system property {{is.testing=true}}
activates a condition in {{GryoRegistrator}} and makes it register them, and
they disappear from the old-only set:
{noformat}
gremlin> System.setProperty('is.testing', 'true')
==>null
gremlin> ks = new org.apache.spark.serializer.KryoSerializer(sc)
==>org.apache.spark.serializer.KryoSerializer@4aefcefb
gremlin> newClassResolverKeys =
ks.newKryo().getClassResolver().classToRegistration.keys().toSet() ; []
gremlin> oldClassResolverKeys -
oldClassResolverKeys.intersect(newClassResolverKeys)
==>void
==>class java.lang.Void
==>class [Lorg.apache.spark.util.collection.CompactBuffer;
gremlin>
{noformat}
This leaves void, Void, and CompactBuffer[] (note leading "[L"). I'll add Void
and void alongside CompactBuffer[] in my PR.
To check my assertion that CompactBuffer is registered but arrays of the same
are not:
{noformat}
gremlin>
oldClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer.class)
==>true
gremlin>
oldClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer[].class)
==>true
gremlin>
newClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer.class)
==>true
gremlin>
newClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer[].class)
==>false
{noformat}
It was interesting to see that BoxedUnit is apparently already getting
registered somewhere. This might be happening in the chill library that Spark
uses, or it might be happening somewhere in Spark proper that I overlooked. I
won't need to register it in my PR.
{noformat}
gremlin> newClassResolverKeys.contains(scala.runtime.BoxedUnit.class)
==>true
{noformat}
> UnshadedKryoAdapter fails to deserialize StarGraph when SparkConf sets
> spark.rdd.compress=true whereas GryoSerializer works
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: TINKERPOP-1341
> URL: https://issues.apache.org/jira/browse/TINKERPOP-1341
> Project: TinkerPop
> Issue Type: Bug
> Components: io
> Affects Versions: 3.2.1, 3.3.0
> Reporter: Dylan Bethune-Waddell
> Priority: Minor
>
> When trying to bulk load a large dataset into Titan I was running into OOM
> errors and decided to try tweaking some spark configuration settings -
> although I am having trouble bulk loading with the new
> GryoRegistrator/UnshadedKryo serialization shim stuff in master whereby a few
> hundred tasks into the edge loading stage (stage 5) exceptions are thrown
> complaining about the need to explicitly register CompactBuffer[].class with
> Kryo, this approach with spark.rdd.compress=true fails a few hundred tasks
> into the vertex loading stage (stage 1) of BulkLoaderVertexProgram.
> GryoSerializer instead of KryoSerializer with GryoRegistrator does not fail
> and successfully loads the data with this compression flag flipped on whereas
> before I would just get OOM errors until eventually the job was set back so
> far that it just failed. So it would seem it is desirable in some instances
> to use this setting, and the new Serialization stuff seems to break it. Could
> be a Spark upstream issue based on this open JIRA ticket
> (https://issues.apache.org/jira/browse/SPARK-3630). Here is the exception
> that is thrown with the middle bits cut out:
> com.esotericsoftware.kryo.KryoException: java.io.IOException: PARSING_ERROR(2)
> at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
> at com.esotericsoftware.kryo.io.Input.require(Input.java:169)
> at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:715)
> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:113)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:103)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:48)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:30)
> at
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.readEdges(StarGraphSerializer.java:134)
> at
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:91)
> at
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:45)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:42)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:30)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:46)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:36)
> at
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
> ........................................................ and so on
> .....................................
> Caused by: java.io.IOException: PARSING_ERROR(2)
> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
> at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
> at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
> at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
> ... 51 more
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)