Robbie Strickland created ZEPPELIN-332:
------------------------------------------

             Summary: CNFE when running SQL query against Cassandra temp table
                 Key: ZEPPELIN-332
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-332
             Project: Zeppelin
          Issue Type: Bug
          Components: Interpreters
    Affects Versions: 0.6.0
         Environment: Ubuntu 14.04, Spark 1.4.1, Zeppelin 0.6.0, Hadoop 2.6.0, 
Cassandra 2.1.8, Cassandra-Spark Connector 1.4.0
            Reporter: Robbie Strickland


When running a SQL statement against a Cassandra temp table where no records 
have previously been realized using the {{SQLContext}}, a 
{{ClassNotFoundException}} is thrown.

For example, we run the following code to register the table:

{code:scala}
import com.datastax.spark.connector._
case class Stats(queue: String, time: Long, host: String, successes: Long)
val stats2 = sc.cassandraTable[Stats]("prod_analytics_events", 
"stats").select("queue", "time", "host", "successes").where("time >= 
1442707200000 and time < 1442793600000")
stats2.toDF.registerTempTable("stats2")
{code}

If we immediately try to run a {{%sql}} query, such as:

{code:sql}
%sql
select * from stats2 limit 10
{code}

we will get the following stack trace:

{code}
java.lang.ClassNotFoundException: $line551.$read
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:500)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1167)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
        at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
        at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
        at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
        at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1255)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
        at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
        at 
com.datastax.spark.connector.rdd.reader.AnyObjectFactory.<init>(AnyObjectFactory.scala:30)
        at 
com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:45)
        at 
com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:22)
        at 
com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:47)
        at 
com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:42)
        at 
com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
        at 
com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
        at 
com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
        at 
com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151)
        at 
com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
        at 
com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
        at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
        at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:300)
        at 
org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:142)
        at 
org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
        at 
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

However, it is possible to run a query directly using the {{SQLContext}} 
without issue:

{code:scala}
sqlContext.sql("select * from stats2 limit 10").collect
{code}

returns the expected results:

{code}
stats2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Stats] = 
CassandraTableScanRDD[637] at RDD at CassandraRDD.scala:15
res155: Array[org.apache.spark.sql.Row] = 
Array([events_ANDROID_LocationUpdate,1442707206499,sink4x056,1821024], 
[events_ANDROID_LocationUpdate,1442707207062,sink4x019,1480357], 
[events_ANDROID_LocationUpdate,1442707266854,sink4x056,1821394], 
[events_ANDROID_LocationUpdate,1442707268281,sink4x019,1480675], 
[events_ANDROID_LocationUpdate,1442707329595,sink4x056,1821771], 
[events_ANDROID_LocationUpdate,1442707332608,sink4x019,1480979], 
[events_ANDROID_LocationUpdate,1442707389853,sink4x056,1822088], 
[events_ANDROID_LocationUpdate,1442707393107,sink4x019,1481257], 
[events_ANDROID_LocationUpdate,1442707451639,sink4x056,1822413], 
[events_ANDROID_LocationUpdate,1442707457504,sink4x019,1481591])
{code}

Additionally, if we first materialize some rows using the {{SQLContext}} (such 
as in the above example), further queries using {{%sql}} work fine.

Relevant config from zeppelin-env.sh:

{code}
export 
ZEPPELIN_JAVA_OPTS="-Dspark.jars=/opt/spark/lib/spark-cassandra-connector-assembly.jar:/opt/hadoop/share/hadoop/tools/lib/*:/opt/jars/*:/opt/spark/lib/pyspark-cassandra.jar
 -Dspark.cassandra.connection.host=x.x.x.x 
-Dspark.cassandra.read.timeout_ms=300000 
-Dspark.cassandra.auth.username=zeppelin 
-Dspark.cassandra.auth.password=[password]"
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to