Hi,

Spark 1.2 changed the APIs a bit which is what's causing the problem with 
es-spark 2.1.0.Beta3. This has been addressed
a while back in es-spark proper; you can get a hold of the dev build (the 
upcoming 2.1.Beta4) here [1].

P.S. Do note that a lot of things have happened in es-hadoop/spark space since 
Spark Summit '14 and I strongly recommend
reading out the docs [2]

Cheers,

[1] 
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/install.html#download-dev
[2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html

On 2/9/15 9:33 PM, Aris wrote:
Hello Spark community and Holden,

I am trying to follow Holden Karau's SparkSQL and ElasticSearch tutorial from 
Spark Summit 2014. I am trying to use
elasticsearch-spark 2.1.0.Beta3 and SparkSQL 1.2 together.

https://github.com/holdenk/elasticsearchspark
/(Side Note: This very nice tutorial does NOT BUILD with sbt 0.13 and 
sbt-assembly 0.12.0-M1 and Spark 1.2...so many
problems I just gave up. )/

In any case, I am trying to work from Holden's tutorial to see ElasticSearch 
and SparkSQL interoperate, with the data
being a bunch of JSON documents in ElasticSearch.

The problem is with the following code, at the line with *sqlCtx.esRDD(..., 
...):*

// this is my def main(args) inside of my test Object
     val Array(esResource, esNodes) = args.take(2)
     val conf = new SparkConf().setAppName("TestEsSpark")
     conf.set("es.index.auto.create", "true")
     val sc = new SparkContext(conf)
     val sqlCtx = new SQLContext(sc)
     import sqlCtx._
     val query = """{"query":{"match":{"schemaName":" 
SuperControllerRequest.json"}}}"""
*    val searched = sqlCtx.esRDD(esResource, query)* // <---- PROBLEM HERE 
<---------------------
     println(searched.schema)

I can assemble this with sbt assembly, after much work in getting SBT to work. 
However, at RUN TIME, I have the
following output, which complains my sqlCtx.esRDD() has a
NoSuchMethodError org.apache.spark.sql.catalyst.types.StructField  according to 
ElasticSearch.

This is a nightmare and I cannot get it to work, does anybody know how to make 
this extremely simple test work?

Further below is my SBT build file that I managed to get to work, borrowing 
from Holden's build.sbt.

// RUN time exception
Exception in thread "main" *java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.types.StructField*.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
         at
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
         at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
         at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
         at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
         at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
         at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
         at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
         at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:27)
         at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:24)
         at 
org.elasticsearch.spark.sql.package$SQLContextFunctions.esRDD(package.scala:17)
*    at testesspark.TestEsSpark$.main(TestEsSpark.scala:41) *            // 
<---- PROBLEM HERE points to above line in code
         at testesspark.TestEsSpark.main(TestEsSpark.scala)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:606)
         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


//build.sbt
import sbtassembly.PathList

name := "testesspark"

version := "0.1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % "1.2.0"
     exclude("org.eclipse.jetty.orbit", "javax.servlet")
     exclude("org.eclipse.jetty.orbit", "javax.transaction")
     exclude("org.eclipse.jetty.orbit", "javax.mail")
     exclude("org.eclipse.jetty.orbit", "javax.activation")
     exclude("commons-beanutils", "commons-beanutils-core")
     exclude("commons-collections", "commons-collections")
     exclude("com.esotericsoftware.minlog", "minlog")
     exclude("org.slf4j", "jcl-over-slf4j" )
     exclude("org.apache.hadoop","hadoop-yarn-api")
     exclude("org.slf4j","slf4j-api"),
   "org.apache.spark" %% "spark-sql" % "1.2.0"
     exclude("org.eclipse.jetty.orbit", "javax.servlet")
     exclude("org.eclipse.jetty.orbit", "javax.transaction")
     exclude("org.eclipse.jetty.orbit", "javax.mail")
     exclude("org.eclipse.jetty.orbit", "javax.activation")
     exclude("commons-beanutils", "commons-beanutils-core")
     exclude("commons-collections", "commons-collections")
     exclude("com.esotericsoftware.minlog", "minlog")
     exclude("org.slf4j", "jcl-over-slf4j" )
     exclude("org.slf4j","slf4j-api"),
   "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta3"
)

resolvers ++= Seq(
   Resolver.sonatypeRepo("snapshots"),
   Resolver.sonatypeRepo("public"),
   "conjars.org <http://conjars.org>" at "http://conjars.org/repo";,
   "JBoss Repository" at 
"http://repository.jboss.org/nexus/content/repositories/releases/";,
   "Spray Repository" at "http://repo.spray.cc/";,
   "Cloudera Repository" at 
"https://repository.cloudera.com/artifactory/cloudera-repos/";,
   "Akka Repository" at "http://repo.akka.io/releases/";,
   "Twitter4J Repository" at "http://twitter4j.org/maven2/";,
   "Apache HBase" at 
"https://repository.apache.org/content/repositories/releases";,
   "Twitter Maven Repo" at "http://maven.twttr.com/";,
   "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools";,
   "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/";,
   "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/";
)

assemblyMergeStrategy in assembly :=
{
   case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
   case m if m.startsWith("META-INF") => MergeStrategy.discard
   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
   case "about.html"  => MergeStrategy.rename
   case "reference.conf" => MergeStrategy.concat
   case _ => MergeStrategy.first
}

mainClass in assembly := Some("testesspark.TestEsSpark")


--
Costin



--
Costin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to