Shubham Chopra created SPARK-16550:
--------------------------------------

             Summary: Caching data with replication doesn't replicate data
                 Key: SPARK-16550
                 URL: https://issues.apache.org/jira/browse/SPARK-16550
             Project: Spark
          Issue Type: Bug
          Components: Block Manager, Spark Core
    Affects Versions: 2.0.0
            Reporter: Shubham Chopra
            Priority: Blocker


Caching multiple replicas of blocks is currently broken. The following examples 
show replication doesn't happen for various use-cases:

These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode

case class TestInteger(i: Int)
val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(MEMORY_ONLY_2)
data.count
sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
only 10 blocks as opposed to the expected 20
Block replication fails on the executors with a java.lang.RuntimeException: 
java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger

val data1 = sc.parallelize(1 to 1000, 
10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
data1.count

Block replication again fails with the following errors:
16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 8567643992794608648
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
        at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)

sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again 
shows 10 blocks

Caching serialized data works for native types, but not for custom classes

val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
data3.count

works as intended.

But 
val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(MEMORY_ONLY_SER_2)
data4.count

Again doesn't replicate data and executors show the same ClassNotFoundException

These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to