An Update on this :
The Exception mentioned in Item 2 in my original post was due to the ES
instance being down (and for some reason I failed to realise that).
That said, I am still having trouble with problem Item 1. Following
questions came up :
1. Is there a correlation between the number of shards/replication on the
ES instance to the number of shard-splits that are crated in the query
request ? And
2. if the ES instance is on a single shard and has a fairly large number of
documents, Would the performance be slower ?
3. Is there any network latency issues ? (I am able to query the instance
using the sense/head plugins, and the response time is not bad its
approximately 28ms)
the reason for question 1. is because of the following :
6738 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [2]
shard-splits
6780 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34372, maxMem=503344005
6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 480.0 MB)
6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2
tasks
6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 18521 bytes)
6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 18521 bytes)
6923 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
6923 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
6958 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
6958 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
6998 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
6998 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
I noticed only two shard-splits being created.
On the other hand when I run the application on localhost with default
settings, this is what I get :
4960 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [5]
shard-splits
5002 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34340, maxMem=511377408
5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 487.7 MB)
5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5
tasks
5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 16090 bytes)
5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 16090 bytes)
5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0
(TID 2, localhost, ANY, 16090 bytes)
5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0
(TID 3, localhost, ANY, 16090 bytes)
5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0
(TID 4, localhost, ANY, 16090 bytes)
5149 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
5149 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
5149 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
5149 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
5149 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
5186 [Executor task launch worker-4] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
5186 [Executor task launch worker-2] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
5186 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
5186 [Executor task launch worker-3] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
5186 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
5234 [Executor task launch worker-2] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-4] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-3] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
7390 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID
1). 1652 bytes result sent to driver
7390 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID
0). 1652 bytes result sent to driver
7390 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID
4). 1652 bytes result sent to driver
7390 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID
2). 1652 bytes result sent to driver
7390 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID
3). 1652 bytes result sent to driver
7410 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0
(TID 0) in 2276 ms on localhost (1/5)
7417 [Result resolver thread-0] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0
(TID 4) in 2269 ms on localhost (2/5)
7424 [Result resolver thread-1] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0
(TID 3) in 2277 ms on localhost (3/5)
7430 [Result resolver thread-2] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0
(TID 2) in 2285 ms on localhost (4/5)
7437 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0
(TID 1) in 2293 ms on localhost (5/5)
Thanks for the help
Ramdev
On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:
>
> Hi:
> I have a very simple application that queries an ES instance and
> returns the count of documents found by the query. I am using the Spark
> interface as I intend to
> do run ML algorithms on the result set. With that said here are the
> problems I face :
>
> 1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
> (to use with hadoopRDD), using a remote ES instance like so :
>
>
> This is using the new APIHadoopRDD interface
>
> val sparkConf = new
> SparkConf().setMaster("local[2]").setAppname("TestESSpark")
> sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
> val sc = new SparkContext(sparkConf)
>
> val conf = new Configuration // change to new JobConf for the old API
> conf.set("es.nodes","remote.server:port")
> conf.set("es.resources","index/type")
> conf.set("es.query","{\"query\":{\"match_all\":{}}")
> val esRDD =
> sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
>
> // change to hadoopRDD for the old API
> val docCount = esRDD.count
> println(docCount)
>
>
> The application just hangs at the println. //((basically executing the
> search or so I think).
>
>
> 2. If I use localhost instead of "remote.server:port" for the es.nodes,
> the application throws an exception :
> Exception in thread "main"
> org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
> error (check network and/or proxy settings)- all nodes failed; tried
> [[localhost:9200]]
> at
> org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
> at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
> at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
> at
> org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
> at
> org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
> at
> org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
> at
> org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
> at org.apache.spark.rdd.RDD.count(RDD.scala:904)
> at
> trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
> at trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)
>
>
> I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
> and running it against a local instance ES version 1.3.2/remote instance
> ES version 1.0.0
>
> Any insight as to what I might be missing/doing wrong ?
>
> Thanks
>
> Ramdev
>
>
--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/28b68542-77ab-47b2-9d91-45f15b281df0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.