Ramdev,
Do your Spark nodes all have access to all of the ES nodes on port 9200?
The connector joins the ES cluster and will query multiple nodes, not just
the configured node in the JobCnf. This bit me on my tests as I was
pointing Spark to a load balancer and hadn't given direct access to the
nodes.
Also, ES 1.0.0 is getting a bit long in the tooth. I'm not sure that's a
tested combination for the connector. I've only personally used the
connector against the 1.3.x train.
David
On Wednesday, October 22, 2014 1:17:57 PM UTC-7, Ramdev Wudali wrote:
>
> An Update on this :
>
> The Exception mentioned in Item 2 in my original post was due to the ES
> instance being down (and for some reason I failed to realise that).
> That said, I am still having trouble with problem Item 1. Following
> questions came up :
>
> 1. Is there a correlation between the number of shards/replication on the
> ES instance to the number of shard-splits that are crated in the query
> request ? And
> 2. if the ES instance is on a single shard and has a fairly large number
> of documents, Would the performance be slower ?
> 3. Is there any network latency issues ? (I am able to query the instance
> using the sense/head plugins, and the response time is not bad its
> approximately 28ms)
>
>
> the reason for question 1. is because of the following :
>
> 6738 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [2]
> shard-splits
> 6780 [main] INFO org.apache.spark.SparkContext - Starting job: count at
> ElasticSparkTest1.scala:59
> 6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
> ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
> 6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
> ElasticSparkTest1.scala:59)
> 6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
> 6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
> 6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
> (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
> has no missing parents
> 6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
> curMem=34372, maxMem=503344005
> 6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
> in memory (estimated size 1568.0 B, free 480.0 MB)
> 6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from
> Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
> 6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2
> tasks
> 6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
> (TID 0, localhost, ANY, 18521 bytes)
> 6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
> (TID 1, localhost, ANY, 18521 bytes)
> 6923 [Executor task launch worker-0] INFO
> org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
> 6923 [Executor task launch worker-1] INFO
> org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
> 6958 [Executor task launch worker-0] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
> 6958 [Executor task launch worker-1] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
> 6998 [Executor task launch worker-0] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 6998 [Executor task launch worker-1] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
>
> I noticed only two shard-splits being created.
>
> On the other hand when I run the application on localhost with default
> settings, this is what I get :
> 4960 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [5]
> shard-splits
> 5002 [main] INFO org.apache.spark.SparkContext - Starting job: count at
> ElasticSparkTest1.scala:59
> 5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
> ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
> 5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
> ElasticSparkTest1.scala:59)
> 5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
> 5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
> 5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
> (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
> has no missing parents
> 5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
> curMem=34340, maxMem=511377408
> 5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
> in memory (estimated size 1568.0 B, free 487.7 MB)
> 5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from
> Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
> 5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5
> tasks
> 5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
> (TID 0, localhost, ANY, 16090 bytes)
> 5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
> (TID 1, localhost, ANY, 16090 bytes)
> 5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0
> (TID 2, localhost, ANY, 16090 bytes)
> 5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0
> (TID 3, localhost, ANY, 16090 bytes)
> 5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO
> org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0
> (TID 4, localhost, ANY, 16090 bytes)
> 5149 [Executor task launch worker-1] INFO
> org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
> 5149 [Executor task launch worker-2] INFO
> org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
> 5149 [Executor task launch worker-4] INFO
> org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
> 5149 [Executor task launch worker-0] INFO
> org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
> 5149 [Executor task launch worker-3] INFO
> org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
> 5186 [Executor task launch worker-4] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
> 5186 [Executor task launch worker-2] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
> 5186 [Executor task launch worker-1] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
> 5186 [Executor task launch worker-3] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
> 5186 [Executor task launch worker-0] INFO
> org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
> [node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
> 5234 [Executor task launch worker-2] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 5234 [Executor task launch worker-4] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 5234 [Executor task launch worker-1] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 5236 [Executor task launch worker-3] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 5236 [Executor task launch worker-0] WARN
> org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
> 7390 [Executor task launch worker-1] INFO
> org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID
> 1). 1652 bytes result sent to driver
> 7390 [Executor task launch worker-0] INFO
> org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID
> 0). 1652 bytes result sent to driver
> 7390 [Executor task launch worker-4] INFO
> org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID
> 4). 1652 bytes result sent to driver
> 7390 [Executor task launch worker-2] INFO
> org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID
> 2). 1652 bytes result sent to driver
> 7390 [Executor task launch worker-3] INFO
> org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID
> 3). 1652 bytes result sent to driver
> 7410 [Result resolver thread-3] INFO
> org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0
> (TID 0) in 2276 ms on localhost (1/5)
> 7417 [Result resolver thread-0] INFO
> org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0
> (TID 4) in 2269 ms on localhost (2/5)
> 7424 [Result resolver thread-1] INFO
> org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0
> (TID 3) in 2277 ms on localhost (3/5)
> 7430 [Result resolver thread-2] INFO
> org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0
> (TID 2) in 2285 ms on localhost (4/5)
> 7437 [Result resolver thread-3] INFO
> org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0
> (TID 1) in 2293 ms on localhost (5/5)
>
>
> Thanks for the help
>
>
> Ramdev
>
>
>
>
> On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:
>>
>> Hi:
>> I have a very simple application that queries an ES instance and
>> returns the count of documents found by the query. I am using the Spark
>> interface as I intend to
>> do run ML algorithms on the result set. With that said here are the
>> problems I face :
>>
>> 1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
>> (to use with hadoopRDD), using a remote ES instance like so :
>>
>>
>> This is using the new APIHadoopRDD interface
>>
>> val sparkConf = new
>> SparkConf().setMaster("local[2]").setAppname("TestESSpark")
>> sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
>> val sc = new SparkContext(sparkConf)
>>
>> val conf = new Configuration // change to new JobConf for the old API
>> conf.set("es.nodes","remote.server:port")
>> conf.set("es.resources","index/type")
>> conf.set("es.query","{\"query\":{\"match_all\":{}}")
>> val esRDD =
>> sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
>>
>> // change to hadoopRDD for the old API
>> val docCount = esRDD.count
>> println(docCount)
>>
>>
>> The application just hangs at the println. //((basically executing the
>> search or so I think).
>>
>>
>> 2. If I use localhost instead of "remote.server:port" for the es.nodes,
>> the application throws an exception :
>> Exception in thread "main"
>> org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
>> error (check network and/or proxy settings)- all nodes failed; tried
>> [[localhost:9200]]
>> at
>> org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
>> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
>> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
>> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
>> at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
>> at
>> org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
>> at
>> org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
>> at
>> org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
>> at
>> org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
>> at org.apache.spark.rdd.RDD.count(RDD.scala:904)
>> at
>> trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
>> at
>> trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)
>>
>>
>> I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
>> and running it against a local instance ES version 1.3.2/remote instance
>> ES version 1.0.0
>>
>> Any insight as to what I might be missing/doing wrong ?
>>
>> Thanks
>>
>> Ramdev
>>
>>
--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/8130712f-7c96-47a5-a6b3-db3cb33dc33a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.