Hi Bin,

Very likely the RedisClientPool is being closed too quickly before map has
a chance to get to it. One way to verify would be to comment out the .close
line and see what happens. FWIW I saw a similar problem writing to Solr
where I put a commit where you have a close, and noticed that the commit
was happening before the actual data insertion (in the .map line) happened
(and no data showing up in the index until the next time I ran the code
:-)).

At the time I got around it by doing a zipWithIndex on the Iterator, then
doing a partial commit every n records, and finally doing a commit from the
driver code. However, this won't work for you, and there is a better way
outlined on this page (look for Tobias Pfeiffer, its the code block
immediately following):

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

where you test for hasNext on the iterator and call close if its the last
element, within the scope of the .map call.

-sujit

On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Are you sure RedisClientPool is being initialized properly in the
> constructor of RedisCache? Can you please copy paste the code that you use
> to initialize RedisClientPool inside the constructor of RedisCache?
>
> Thanks,
> Aniket
>
> On Fri, Oct 23, 2015 at 11:47 AM Bin Wang <wbi...@gmail.com> wrote:
>
>> BTW, "lines" is a DStream.
>>
>> Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道:
>>
>>> I use mapPartitions to open connections to Redis, I write it like this:
>>>
>>>     val seqs = lines.mapPartitions { lines =>
>>>       val cache = new RedisCache(redisUrl, redisPort)
>>>       val result = lines.map(line => Parser.parseBody(line, cache))
>>>       cache.redisPool.close
>>>       result
>>>     }
>>>
>>> But it seems the pool is closed before I use it. Am I doing anything
>>> wrong? Here is the error:
>>>
>>> java.lang.IllegalStateException: Pool not open
>>>     at 
>>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>>     at 
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>>     at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>     at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>>     at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>>     at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>>     at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at scala.collection.immutable.List.foreach(List.scala:318)
>>>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>     at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>>     at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>>     at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>     at 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>>     at 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>>     at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>     at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>>

Reply via email to