Hi, Spark 1.2 changed the APIs a bit which is what's causing the problem with es-spark 2.1.0.Beta3. This has been addressed a while back in es-spark proper; you can get a hold of the dev build (the upcoming 2.1.Beta4) here [1].
P.S. Do note that a lot of things have happened in es-hadoop/spark space since Spark Summit '14 and I strongly recommend reading out the docs [2] Cheers, [1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/install.html#download-dev [2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html On 2/9/15 9:33 PM, Aris wrote:
Hello Spark community and Holden, I am trying to follow Holden Karau's SparkSQL and ElasticSearch tutorial from Spark Summit 2014. I am trying to use elasticsearch-spark 2.1.0.Beta3 and SparkSQL 1.2 together. https://github.com/holdenk/elasticsearchspark /(Side Note: This very nice tutorial does NOT BUILD with sbt 0.13 and sbt-assembly 0.12.0-M1 and Spark 1.2...so many problems I just gave up. )/ In any case, I am trying to work from Holden's tutorial to see ElasticSearch and SparkSQL interoperate, with the data being a bunch of JSON documents in ElasticSearch. The problem is with the following code, at the line with *sqlCtx.esRDD(..., ...):* // this is my def main(args) inside of my test Object val Array(esResource, esNodes) = args.take(2) val conf = new SparkConf().setAppName("TestEsSpark") conf.set("es.index.auto.create", "true") val sc = new SparkContext(conf) val sqlCtx = new SQLContext(sc) import sqlCtx._ val query = """{"query":{"match":{"schemaName":" SuperControllerRequest.json"}}}""" * val searched = sqlCtx.esRDD(esResource, query)* // <---- PROBLEM HERE <--------------------- println(searched.schema) I can assemble this with sbt assembly, after much work in getting SBT to work. However, at RUN TIME, I have the following output, which complains my sqlCtx.esRDD() has a NoSuchMethodError org.apache.spark.sql.catalyst.types.StructField according to ElasticSearch. This is a nightmare and I cannot get it to work, does anybody know how to make this extremely simple test work? Further below is my SBT build file that I managed to get to work, borrowing from Holden's build.sbt. // RUN time exception Exception in thread "main" *java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField*.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:27) at org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:24) at org.elasticsearch.spark.sql.package$SQLContextFunctions.esRDD(package.scala:17) * at testesspark.TestEsSpark$.main(TestEsSpark.scala:41) * // <---- PROBLEM HERE points to above line in code at testesspark.TestEsSpark.main(TestEsSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) //build.sbt import sbtassembly.PathList name := "testesspark" version := "0.1.1" scalaVersion := "2.10.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0" exclude("org.eclipse.jetty.orbit", "javax.servlet") exclude("org.eclipse.jetty.orbit", "javax.transaction") exclude("org.eclipse.jetty.orbit", "javax.mail") exclude("org.eclipse.jetty.orbit", "javax.activation") exclude("commons-beanutils", "commons-beanutils-core") exclude("commons-collections", "commons-collections") exclude("com.esotericsoftware.minlog", "minlog") exclude("org.slf4j", "jcl-over-slf4j" ) exclude("org.apache.hadoop","hadoop-yarn-api") exclude("org.slf4j","slf4j-api"), "org.apache.spark" %% "spark-sql" % "1.2.0" exclude("org.eclipse.jetty.orbit", "javax.servlet") exclude("org.eclipse.jetty.orbit", "javax.transaction") exclude("org.eclipse.jetty.orbit", "javax.mail") exclude("org.eclipse.jetty.orbit", "javax.activation") exclude("commons-beanutils", "commons-beanutils-core") exclude("commons-collections", "commons-collections") exclude("com.esotericsoftware.minlog", "minlog") exclude("org.slf4j", "jcl-over-slf4j" ) exclude("org.slf4j","slf4j-api"), "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta3" ) resolvers ++= Seq( Resolver.sonatypeRepo("snapshots"), Resolver.sonatypeRepo("public"), "conjars.org <http://conjars.org>" at "http://conjars.org/repo", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", "Akka Repository" at "http://repo.akka.io/releases/", "Twitter4J Repository" at "http://twitter4j.org/maven2/", "Apache HBase" at "https://repository.apache.org/content/repositories/releases", "Twitter Maven Repo" at "http://maven.twttr.com/", "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools", "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/", "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/" ) assemblyMergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.startsWith("META-INF") => MergeStrategy.discard case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("org", "jboss", xs @ _*) => MergeStrategy.first case "about.html" => MergeStrategy.rename case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } mainClass in assembly := Some("testesspark.TestEsSpark")
-- Costin -- Costin --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org