An Update on this :

The Exception mentioned in Item 2 in my original post was due to the ES 
instance being down (and for some reason I failed to realise that).
That said, I am still having trouble with  problem Item 1. Following 
questions came up :

1. Is there a correlation between the number of shards/replication  on the 
ES instance to the number of shard-splits that are crated in the query 
request ? And
2. if the ES instance is on a single shard and has a fairly large number of 
documents, Would the performance be slower ?
3. Is there any network latency issues ? (I am able to query the instance 
using the sense/head plugins, and the response time is not bad  its 
approximately 28ms)


the reason for question 1. is because of the following : 

 6738 [main] INFO  org.elasticsearch.hadoop.mr.EsInputFormat - Created [2] 
shard-splits
6780 [main] INFO  org.apache.spark.SparkContext - Starting job: count at 
ElasticSparkTest1.scala:59
6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at 
ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at 
ElasticSparkTest1.scala:59)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0 
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which 
has no missing parents
6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with 
curMem=34372, maxMem=503344005
6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values 
in memory (estimated size 1568.0 B, free 480.0 MB)
6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from 
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2 
tasks
6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0 
(TID 0, localhost, ANY, 18521 bytes)
6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0 
(TID 1, localhost, ANY, 18521 bytes)
6923 [Executor task launch worker-0] INFO 
 org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
6923 [Executor task launch worker-1] INFO 
 org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
6958 [Executor task launch worker-0] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
6958 [Executor task launch worker-1] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
6998 [Executor task launch worker-0] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
6998 [Executor task launch worker-1] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...

I noticed only two shard-splits being created. 

On the other hand when I run the application on localhost with default 
settings, this is what I get  :
4960 [main] INFO  org.elasticsearch.hadoop.mr.EsInputFormat - Created [5] 
shard-splits
5002 [main] INFO  org.apache.spark.SparkContext - Starting job: count at 
ElasticSparkTest1.scala:59
5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at 
ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at 
ElasticSparkTest1.scala:59)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0 
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which 
has no missing parents
5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with 
curMem=34340, maxMem=511377408
5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values 
in memory (estimated size 1568.0 B, free 487.7 MB)
5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from 
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO 
 org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5 
tasks
5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0 
(TID 0, localhost, ANY, 16090 bytes)
5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0 
(TID 1, localhost, ANY, 16090 bytes)
5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0 
(TID 2, localhost, ANY, 16090 bytes)
5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0 
(TID 3, localhost, ANY, 16090 bytes)
5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO 
 org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0 
(TID 4, localhost, ANY, 16090 bytes)
5149 [Executor task launch worker-1] INFO 
 org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
5149 [Executor task launch worker-2] INFO 
 org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
5149 [Executor task launch worker-4] INFO 
 org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
5149 [Executor task launch worker-0] INFO 
 org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
5149 [Executor task launch worker-3] INFO 
 org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
5186 [Executor task launch worker-4] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
5186 [Executor task launch worker-2] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
5186 [Executor task launch worker-1] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
5186 [Executor task launch worker-3] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
5186 [Executor task launch worker-0] INFO 
 org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit 
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
5234 [Executor task launch worker-2] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-4] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-1] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-3] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-0] WARN 
 org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
7390 [Executor task launch worker-1] INFO 
 org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID 
1). 1652 bytes result sent to driver
7390 [Executor task launch worker-0] INFO 
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID 
0). 1652 bytes result sent to driver
7390 [Executor task launch worker-4] INFO 
 org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID 
4). 1652 bytes result sent to driver
7390 [Executor task launch worker-2] INFO 
 org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID 
2). 1652 bytes result sent to driver
7390 [Executor task launch worker-3] INFO 
 org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID 
3). 1652 bytes result sent to driver
7410 [Result resolver thread-3] INFO 
 org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0 
(TID 0) in 2276 ms on localhost (1/5)
7417 [Result resolver thread-0] INFO 
 org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0 
(TID 4) in 2269 ms on localhost (2/5)
7424 [Result resolver thread-1] INFO 
 org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0 
(TID 3) in 2277 ms on localhost (3/5)
7430 [Result resolver thread-2] INFO 
 org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0 
(TID 2) in 2285 ms on localhost (4/5)
7437 [Result resolver thread-3] INFO 
 org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0 
(TID 1) in 2293 ms on localhost (5/5)


Thanks for the help


Ramdev




On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:
>
> Hi:
>    I have a very simple application that queries an ES instance and 
> returns the count of documents found by the query.  I am using the Spark 
> interface as I intend to 
> do run ML algorithms on the result set. With that said here are the 
> problems I face :
>
> 1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf 
> (to use with hadoopRDD), using a remote ES instance like so :
>
>
> This is using the new APIHadoopRDD interface
>
>   val sparkConf = new 
> SparkConf().setMaster("local[2]").setAppname("TestESSpark")
>   sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
>   val sc = new SparkContext(sparkConf)
>
>    val conf = new Configuration   // change to new JobConf for the old API
>    conf.set("es.nodes","remote.server:port")
>    conf.set("es.resources","index/type")
>    conf.set("es.query","{\"query\":{\"match_all\":{}}")
>   val esRDD = 
> sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
>  
>  // change to hadoopRDD for the old API
>   val docCount = esRDD.count
>   println(docCount)
>
>
> The application just hangs at the println. //((basically executing  the 
> search or so I think).  
>
>
> 2. If I use localhost instead of "remote.server:port" for the es.nodes, 
> the application throws an exception :
> Exception in thread "main" 
> org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection 
> error (check network and/or proxy settings)- all nodes failed; tried 
> [[localhost:9200]] 
> at 
> org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
> at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
> at 
> org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
> at 
> org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
> at 
> org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
> at 
> org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
> at org.apache.spark.rdd.RDD.count(RDD.scala:904)
> at 
> trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
> at trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)
>
>
> I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library. 
>  and running it against a local instance ES version 1.3.2/remote instance 
> ES version 1.0.0
>
> Any insight as to what I might be missing/doing wrong ?
>
> Thanks
>
> Ramdev
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/28b68542-77ab-47b2-9d91-45f15b281df0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to