Option Encoder
Is there a proper way to make or get an Encoder for Option in Spark 2.0? There isn't one by default and while ExpressionEncoder from catalyst will work, it is private and unsupported. -- *Richard Marscher* Senior Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Dataset - reduceByKey
There certainly are some gaps between the richness of the RDD API and the Dataset API. I'm also migrating from RDD to Dataset and ran into reduceByKey and join scenarios. In the spark-dev list, one person was discussing reduceByKey being sub-optimal at the moment and it spawned this JIRA https://issues.apache.org/jira/browse/SPARK-15598. But you might be able to get by with groupBy().reduce() for now, check performance though. As for join, the approach would be using the joinWith function on Dataset. Although the API isn't as sugary as it was for RDD IMO, something which I've been discussing in a separate thread as well. I can't find a weblink for it but the thread subject is "Dataset Outer Join vs RDD Outer Join". On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > It would also be nice if there was a better example of joining two > Datasets. I am looking at the documentation here: > http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems > a little bit sparse - is there a better documentation source? > > Regards, > > Bryan Jeffrey > > On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> Hello. >> >> I am looking at the option of moving RDD based operations to Dataset >> based operations. We are calling 'reduceByKey' on some pair RDDs we have. >> What would the equivalent be in the Dataset interface - I do not see a >> simple reduceByKey replacement. >> >> Regards, >> >> Bryan Jeffrey >> >> > -- *Richard Marscher* Senior Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Dataset Outer Join vs RDD Outer Join
For anyone following along the chain went private for a bit, but there were still issues with the bytecode generation in the 2.0-preview so this JIRA was created: https://issues.apache.org/jira/browse/SPARK-15786 On Mon, Jun 6, 2016 at 1:11 PM, Michael Armbrust <mich...@databricks.com> wrote: > That kind of stuff is likely fixed in 2.0. If you can get a reproduction > working there it would be very helpful if you could open a JIRA. > > On Mon, Jun 6, 2016 at 7:37 AM, Richard Marscher <rmarsc...@localytics.com > > wrote: > >> A quick unit test attempt didn't get far replacing map with as[], I'm >> only working against 1.6.1 at the moment though, I was going to try 2.0 but >> I'm having a hard time building a working spark-sql jar from source, the >> only ones I've managed to make are intended for the full assembly fat jar. >> >> >> Example of the error from calling joinWith as left_outer and then >> .as[(Option[T], U]) where T and U are Int and Int. >> >> [info] newinstance(class scala.Tuple2,decodeusingserializer(input[0, >> StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1, >> StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class >> scala.Tuple2),None) >> [info] :- decodeusingserializer(input[0, >> StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))],scala.Option,true) >> [info] : +- input[0, StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))] >> [info] +- decodeusingserializer(input[1, >> StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))],scala.Option,true) >> [info]+- input[1, StructType(StructField(_1,IntegerType,true), >> StructField(_2,IntegerType,true))] >> >> Cause: java.util.concurrent.ExecutionException: java.lang.Exception: >> failed to compile: org.codehaus.commons.compiler.CompileException: File >> 'generated.java', Line 32, Column 60: No applicable constructor/method >> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; >> candidates are: "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[], int, int)" >> >> The generated code is passing InternalRow objects into the ByteBuffer >> >> Starting from two Datasets of types Dataset[(Int, Int)] with expression >> $"left._1" === $"right._1". I'll have to spend some time getting a better >> understanding of this analysis phase, but hopefully I can come up with >> something. >> >> On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> Option should place nicely with encoders, but its always possible there >>> are bugs. I think those function signatures are slightly more expensive >>> (one extra object allocation) and its not as java friendly so we probably >>> don't want them to be the default. >>> >>> That said, I would like to enable that kind of sugar while still taking >>> advantage of all the optimizations going on under the covers. Can you get >>> it to work if you use `as[...]` instead of `map`? >>> >>> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher < >>> rmarsc...@localytics.com> wrote: >>> >>>> Ah thanks, I missed seeing the PR for >>>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became >>>> null objects then I can implement methods that will map those back to >>>> results that align closer to the RDD interface. >>>> >>>> As a follow on, I'm curious about thoughts regarding enriching the >>>> Dataset join interface versus a package or users sugaring for themselves. I >>>> haven't considered the implications of what the optimizations datasets, >>>> tungsten, and/or bytecode gen can do now regarding joins so I may be >>>> missing a critical benefit there around say avoiding Options in favor of >>>> nulls. If nothing else, I guess Option doesn't have a first class Encoder >>>> or DataType yet and maybe for good reasons. >>>> >>>> I did find the RDD join interface elegant, though. In the ideal world >>>> an API comparable the following would be nice: >>>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06 >>>> >>>> >>>> On Wed, Jun 1,
Re: Dataset Outer Join vs RDD Outer Join
A quick unit test attempt didn't get far replacing map with as[], I'm only working against 1.6.1 at the moment though, I was going to try 2.0 but I'm having a hard time building a working spark-sql jar from source, the only ones I've managed to make are intended for the full assembly fat jar. Example of the error from calling joinWith as left_outer and then .as[(Option[T], U]) where T and U are Int and Int. [info] newinstance(class scala.Tuple2,decodeusingserializer(input[0, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class scala.Tuple2),None) [info] :- decodeusingserializer(input[0, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))],scala.Option,true) [info] : +- input[0, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))] [info] +- decodeusingserializer(input[1, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))],scala.Option,true) [info]+- input[1, StructType(StructField(_1,IntegerType,true), StructField(_2,IntegerType,true))] Cause: java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 32, Column 60: No applicable constructor/method found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; candidates are: "public static java.nio.ByteBuffer java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer java.nio.ByteBuffer.wrap(byte[], int, int)" The generated code is passing InternalRow objects into the ByteBuffer Starting from two Datasets of types Dataset[(Int, Int)] with expression $"left._1" === $"right._1". I'll have to spend some time getting a better understanding of this analysis phase, but hopefully I can come up with something. On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust <mich...@databricks.com> wrote: > Option should place nicely with encoders, but its always possible there > are bugs. I think those function signatures are slightly more expensive > (one extra object allocation) and its not as java friendly so we probably > don't want them to be the default. > > That said, I would like to enable that kind of sugar while still taking > advantage of all the optimizations going on under the covers. Can you get > it to work if you use `as[...]` instead of `map`? > > On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher < > rmarsc...@localytics.com> wrote: > >> Ah thanks, I missed seeing the PR for >> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became >> null objects then I can implement methods that will map those back to >> results that align closer to the RDD interface. >> >> As a follow on, I'm curious about thoughts regarding enriching the >> Dataset join interface versus a package or users sugaring for themselves. I >> haven't considered the implications of what the optimizations datasets, >> tungsten, and/or bytecode gen can do now regarding joins so I may be >> missing a critical benefit there around say avoiding Options in favor of >> nulls. If nothing else, I guess Option doesn't have a first class Encoder >> or DataType yet and maybe for good reasons. >> >> I did find the RDD join interface elegant, though. In the ideal world an >> API comparable the following would be nice: >> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06 >> >> >> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> Thanks for the feedback. I think this will address at least some of the >>> problems you are describing: https://github.com/apache/spark/pull/13425 >>> >>> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher < >>> rmarsc...@localytics.com> wrote: >>> >>>> Hi, >>>> >>>> I've been working on transitioning from RDD to Datasets in our codebase >>>> in anticipation of being able to leverage features of 2.0. >>>> >>>> I'm having a lot of difficulties with the impedance mismatches between >>>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like >>>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option >>>> types of the results from the right side of the join. This follows >>>> idiomatic Scala avoiding nulls and was easy to work with. >>>> >>>> Now with Dataset there is only joinWith where you specify the join >>>> type, but it l
Re: Dataset Outer Join vs RDD Outer Join
Ah thanks, I missed seeing the PR for https://issues.apache.org/jira/browse/SPARK-15441. If the rows became null objects then I can implement methods that will map those back to results that align closer to the RDD interface. As a follow on, I'm curious about thoughts regarding enriching the Dataset join interface versus a package or users sugaring for themselves. I haven't considered the implications of what the optimizations datasets, tungsten, and/or bytecode gen can do now regarding joins so I may be missing a critical benefit there around say avoiding Options in favor of nulls. If nothing else, I guess Option doesn't have a first class Encoder or DataType yet and maybe for good reasons. I did find the RDD join interface elegant, though. In the ideal world an API comparable the following would be nice: https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06 On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust <mich...@databricks.com> wrote: > Thanks for the feedback. I think this will address at least some of the > problems you are describing: https://github.com/apache/spark/pull/13425 > > On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <rmarsc...@localytics.com > > wrote: > >> Hi, >> >> I've been working on transitioning from RDD to Datasets in our codebase >> in anticipation of being able to leverage features of 2.0. >> >> I'm having a lot of difficulties with the impedance mismatches between >> how outer joins worked with RDD versus Dataset. The Dataset joins feel like >> a big step backwards IMO. With RDD, leftOuterJoin would give you Option >> types of the results from the right side of the join. This follows >> idiomatic Scala avoiding nulls and was easy to work with. >> >> Now with Dataset there is only joinWith where you specify the join type, >> but it lost all the semantics of identifying missing data from outer joins. >> I can write some enriched methods on Dataset with an implicit class to >> abstract messiness away if Dataset nulled out all mismatching data from an >> outer join, however the problem goes even further in that the values aren't >> always null. Integer, for example, defaults to -1 instead of null. Now it's >> completely ambiguous what data in the join was actually there versus >> populated via this atypical semantic. >> >> Are there additional options available to work around this issue? I can >> convert to RDD and back to Dataset but that's less than ideal. >> >> Thanks, >> -- >> *Richard Marscher* >> Senior Software Engineer >> Localytics >> Localytics.com <http://localytics.com/> | Our Blog >> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | >> Facebook <http://facebook.com/localytics> | LinkedIn >> <http://www.linkedin.com/company/1148792?trk=tyah> >> > > -- *Richard Marscher* Senior Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Dataset Outer Join vs RDD Outer Join
Hi, I've been working on transitioning from RDD to Datasets in our codebase in anticipation of being able to leverage features of 2.0. I'm having a lot of difficulties with the impedance mismatches between how outer joins worked with RDD versus Dataset. The Dataset joins feel like a big step backwards IMO. With RDD, leftOuterJoin would give you Option types of the results from the right side of the join. This follows idiomatic Scala avoiding nulls and was easy to work with. Now with Dataset there is only joinWith where you specify the join type, but it lost all the semantics of identifying missing data from outer joins. I can write some enriched methods on Dataset with an implicit class to abstract messiness away if Dataset nulled out all mismatching data from an outer join, however the problem goes even further in that the values aren't always null. Integer, for example, defaults to -1 instead of null. Now it's completely ambiguous what data in the join was actually there versus populated via this atypical semantic. Are there additional options available to work around this issue? I can convert to RDD and back to Dataset but that's less than ideal. Thanks, -- *Richard Marscher* Senior Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Local Mode: Executor thread leak?
Alright I was able to work through the problem. So the owning thread was one from the executor task launch worker, which at least in local mode runs the task and the related user code of the task. After judiciously naming every thread in the pools in the user code (with a custom ThreadFactory) I was able to trace down the leak to a couple thread pools that were not shut down properly by noticing the named threads accumulating in thread dumps of the JVM process. On Mon, Dec 7, 2015 at 6:41 PM, Richard Marscher <rmarsc...@localytics.com> wrote: > Thanks for the response. > > The version is Spark 1.5.2. > > Some examples of the thread names: > > pool-1061-thread-1 > pool-1059-thread-1 > pool-1638-thread-1 > > There become hundreds then thousands of these stranded in WAITING. > > I added logging to try to track the lifecycle of the thread pool in > Executor as mentioned before. Here is an excerpt, but every seems fine > there. Every executor that starts is also shut down and it seems like it > shuts down fine. > > 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor > driver. pool shut down > java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] > 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread > pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size > = 0, active threads = 0, queued tasks = 0, completed tasks = 0] > 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor > driver. pool shut down > java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36] > 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread > pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size > = 0, active threads = 0, queued tasks = 0, completed tasks = 0] > 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor > driver. pool shut down > java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288] > > Also here is an example thread dump of such a thread: > > "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on > condition [0x7f0c33c3e000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x7f10b3e8fb60> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote: > >> Which version are you using? Could you post these thread names here? >> >> Best Regards, >> Shixiong Zhu >> >> 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>: >> >>> Hi, >>> >>> I've been running benchmarks against Spark in local mode in a long >>> running process. I'm seeing threads leaking each time it runs a job. It >>> doesn't matter if I recycle SparkContext constantly or have 1 context stay >>> alive for the entire application lifetime. >>> >>> I see a huge accumulation ongoing of "pool--thread-1" threads with >>> the creating thread "Executor task launch worker-xx" where x's are numbers. >>> The number of leaks per launch worker varies but usually 1 to a few. >>> >>> Searching the Spark code the pool is created in the Executor class. It >>> is `.shutdown()` in the stop for the executor. I've wired up logging and >>> also tried shutdownNow() and awaitForTermination on the pools. Every seems >>> okay there for every Executor that is called with `stop()` but I'm still >>> not sure yet if every Executor is called as such, which I am looking into >>> now. >>> >>> What I'm curious to know is if anyone has seen a similar issue? >>> >>> -- >>> *Richard Marscher* >>> Software Eng
Local Mode: Executor thread leak?
Hi, I've been running benchmarks against Spark in local mode in a long running process. I'm seeing threads leaking each time it runs a job. It doesn't matter if I recycle SparkContext constantly or have 1 context stay alive for the entire application lifetime. I see a huge accumulation ongoing of "pool--thread-1" threads with the creating thread "Executor task launch worker-xx" where x's are numbers. The number of leaks per launch worker varies but usually 1 to a few. Searching the Spark code the pool is created in the Executor class. It is `.shutdown()` in the stop for the executor. I've wired up logging and also tried shutdownNow() and awaitForTermination on the pools. Every seems okay there for every Executor that is called with `stop()` but I'm still not sure yet if every Executor is called as such, which I am looking into now. What I'm curious to know is if anyone has seen a similar issue? -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Local Mode: Executor thread leak?
Thanks for the response. The version is Spark 1.5.2. Some examples of the thread names: pool-1061-thread-1 pool-1059-thread-1 pool-1638-thread-1 There become hundreds then thousands of these stranded in WAITING. I added logging to try to track the lifecycle of the thread pool in Executor as mentioned before. Here is an excerpt, but every seems fine there. Every executor that starts is also shut down and it seems like it shuts down fine. 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor driver. pool shut down java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor driver. pool shut down java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36] 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor driver. pool shut down java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288] Also here is an example thread dump of such a thread: "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on condition [0x7f0c33c3e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f10b3e8fb60> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote: > Which version are you using? Could you post these thread names here? > > Best Regards, > Shixiong Zhu > > 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>: > >> Hi, >> >> I've been running benchmarks against Spark in local mode in a long >> running process. I'm seeing threads leaking each time it runs a job. It >> doesn't matter if I recycle SparkContext constantly or have 1 context stay >> alive for the entire application lifetime. >> >> I see a huge accumulation ongoing of "pool--thread-1" threads with >> the creating thread "Executor task launch worker-xx" where x's are numbers. >> The number of leaks per launch worker varies but usually 1 to a few. >> >> Searching the Spark code the pool is created in the Executor class. It is >> `.shutdown()` in the stop for the executor. I've wired up logging and also >> tried shutdownNow() and awaitForTermination on the pools. Every seems okay >> there for every Executor that is called with `stop()` but I'm still not >> sure yet if every Executor is called as such, which I am looking into now. >> >> What I'm curious to know is if anyone has seen a similar issue? >> >> -- >> *Richard Marscher* >> Software Engineer >> Localytics >> Localytics.com <http://localytics.com/> | Our Blog >> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | >> Facebook <http://facebook.com/localytics> | LinkedIn >> <http://www.linkedin.com/company/1148792?trk=tyah> >> > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Local mode: Stages hang for minutes
Hi, I'm doing some testing of workloads using local mode on a server. I see weird behavior where a job is submitted to the application and it just hangs for several minutes doing nothing. The stages are submitted as pending and in the application UI the stage view claims no tasks have been submitted. Suddenly after a few minutes things suddenly start and run smoothly. I'm running against tiny data sets the size of 10s to low 100s of items in the RDD. I've been attaching with JProfiler, doing thread and heap dumps but nothing is really standing out as to why Spark seems to periodically pause for such a long time. Has anyone else seen similar behavior or aware of some quirk of local mode that could cause this kind of blocking? -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Local mode: Stages hang for minutes
I should add that the pauses are not from GC and also in tracing the CPU call tree in the JVM it seems like nothing is doing any work, just seems to be idling or blocking. On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher <rmarsc...@localytics.com> wrote: > Hi, > > I'm doing some testing of workloads using local mode on a server. I see > weird behavior where a job is submitted to the application and it just > hangs for several minutes doing nothing. The stages are submitted as > pending and in the application UI the stage view claims no tasks have been > submitted. Suddenly after a few minutes things suddenly start and run > smoothly. > > I'm running against tiny data sets the size of 10s to low 100s of items in > the RDD. I've been attaching with JProfiler, doing thread and heap dumps > but nothing is really standing out as to why Spark seems to periodically > pause for such a long time. > > Has anyone else seen similar behavior or aware of some quirk of local mode > that could cause this kind of blocking? > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Local mode: Stages hang for minutes
Ended up realizing I was only looking at the call tree for running threads. After looking at blocking threads I saw that it was spending hundreds of compute hours blocking on jets3t calls to S3. Realized it was looking over likely thousands if not hundreds of thousands of S3 files accumulated over many rounds of load testing. Cleaning the files fixed the issue and I'm pretty sure it's already well known that the underlying s3n doesn't handle traversing a large s3 file tree with the sparkContext.textFile function using wildcard well. On Thu, Dec 3, 2015 at 12:57 PM, Ali Tajeldin EDU <alitedu1...@gmail.com> wrote: > You can try to run "jstack" a couple of times while the app is hung to > look for patterns for where the app is hung. > -- > Ali > > > On Dec 3, 2015, at 8:27 AM, Richard Marscher <rmarsc...@localytics.com> > wrote: > > I should add that the pauses are not from GC and also in tracing the CPU > call tree in the JVM it seems like nothing is doing any work, just seems to > be idling or blocking. > > On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher < > rmarsc...@localytics.com> wrote: > >> Hi, >> >> I'm doing some testing of workloads using local mode on a server. I see >> weird behavior where a job is submitted to the application and it just >> hangs for several minutes doing nothing. The stages are submitted as >> pending and in the application UI the stage view claims no tasks have been >> submitted. Suddenly after a few minutes things suddenly start and run >> smoothly. >> >> I'm running against tiny data sets the size of 10s to low 100s of items >> in the RDD. I've been attaching with JProfiler, doing thread and heap dumps >> but nothing is really standing out as to why Spark seems to periodically >> pause for such a long time. >> >> Has anyone else seen similar behavior or aware of some quirk of local >> mode that could cause this kind of blocking? >> >> -- >> *Richard Marscher* >> Software Engineer >> Localytics >> Localytics.com <http://localytics.com/> | Our Blog >> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | >> Facebook <http://facebook.com/localytics> | LinkedIn >> <http://www.linkedin.com/company/1148792?trk=tyah> >> > > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> > > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: New to Spark - Paritioning Question
Ah I see. In that case, the groupByKey function does guarantee every key is on exactly one partition matched with the aggregated data. This can be improved depending on what you want to do after. Group by key only aggregates the data after shipping it across the cluster. Meanwhile, using reduceByKey will do aggregation on each node first, then ship those results to the final node and partition to finalize the aggregation there. If that makes sense. So say Node 1 has pairs: (a, 1), (b, 2), (b, 6) Node 2 has pairs: (a, 2), (a,3), (b, 4) group by would say send both a pair and b pairs across the network. If you did reduce with the aggregate of sum then you'd expect it to ship (b, 8) from Node 1 or (a, 5) from Node 2 since it did the local aggregation first. You are correct that doing something with expensive side-effects like writing to a database (connections and network + I/O) is best done with the mapPartitions or foreachPartition type of functions on RDD so you can share a database connection and also potentially do things like batch statements. On Tue, Sep 8, 2015 at 7:37 PM, Mike Wright <mwri...@snl.com> wrote: > Thanks for the response! > > Well, in retrospect each partition doesn't need to be restricted to a > single key. But, I cannot have values associated with a key span partitions > since they all need to be processed together for a key to facilitate > cumulative calcs. So provided an individual key has all its values in a > single partition, I'm OK. > > Additionally, the values will be written to the database, and from what I > have read doing this at the partition level is the best compromise between > 1) Writing the calculated values for each key (lots of connect/disconnects) > and collecting them all at the end and writing them all at once. > > I am using a groupBy against the filtered RDD the get the grouping I want, > but apparently this may not be the most efficient way, and it seems that > everything is always in a single partition under this scenario. > > > ___ > > *Mike Wright* > Principal Architect, Software Engineering > > SNL Financial LC > 434-951-7816 *p* > 434-244-4466 *f* > 540-470-0119 *m* > > mwri...@snl.com > > On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher <rmarsc...@localytics.com > > wrote: > >> That seems like it could work, although I don't think `partitionByKey` is >> a thing, at least for RDD. You might be able to merge step #2 and step #3 >> into one step by using the `reduceByKey` function signature that takes in a >> Partitioner implementation. >> >> def reduceByKey(partitioner: Partitioner >> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html> >> , func: (V, V) ⇒ V): RDD >> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html> >> [(K, V)] >> >> Merge the values for each key using an associative reduce function. This >> will also perform the merging locally on each mapper before sending results >> to a reducer, similarly to a "combiner" in MapReduce. >> >> The tricky part might be getting the partitioner to know about the number >> of partitions, which I think it needs to know upfront in `abstract def >> numPartitions: Int`. The `HashPartitioner` for example takes in the >> number as a constructor argument, maybe you could use that with an upper >> bound size if you don't mind empty partitions. Otherwise you might have to >> mess around to extract the exact number of keys if it's not readily >> available. >> >> Aside: what is the requirement to have each partition only contain the >> data related to one key? >> >> On Fri, Sep 4, 2015 at 11:06 AM, mmike87 <mwri...@snl.com> wrote: >> >>> Hello, I am new to Apache Spark and this is my company's first Spark >>> project. >>> Essentially, we are calculating models dealing with Mining data using >>> Spark. >>> >>> I am holding all the source data in a persisted RDD that we will refresh >>> periodically. When a "scenario" is passed to the Spark job (we're using >>> Job >>> Server) the persisted RDD is filtered to the relevant mines. For >>> example, we >>> may want all mines in Chile and the 1990-2015 data for each. >>> >>> Many of the calculations are cumulative, that is when we apply user-input >>> "adjustment factors" to a value, we also need the "flexed" value we >>> calculated for that mine previously. >>> >>> To ensure that this works, the idea if to: >>> >>> 1) Filter the superset to relevant mines (done) >>> 2) Group the subset by the
Re: What should be the optimal value for spark.sql.shuffle.partition?
I see you reposted with more details: "I have 2 TB of skewed data to process and then convert rdd into dataframe and use it as table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4 cores." If I'm reading that correctly, you have 2TB of data and 1.2TB of memory in the cluster. I think that's a fundamental problem up front. If it's skewed then that will be even worse for doing aggregation. I think to start the data either needs to be broken down or the cluster upgraded unfortunately. On Wed, Sep 9, 2015 at 5:41 PM, Richard Marscher <rmarsc...@localytics.com> wrote: > Do you have any details about the cluster you are running this against? > The memory per executor/node, number of executors, and such? Even at a > shuffle setting of 1000 that would be roughly 1GB per partition assuming > the 1TB of data includes overheads in the JVM. Maybe try another order of > magnitude higher for number of shuffle partitions and see where that gets > you? > > On Tue, Sep 1, 2015 at 12:11 PM, unk1102 <umesh.ka...@gmail.com> wrote: > >> Hi I am using Spark SQL actually hiveContext.sql() which uses group by >> queries and I am running into OOM issues. So thinking of increasing value >> of >> spark.sql.shuffle.partition from 200 default to 1000 but it is not >> helping. >> Please correct me if I am wrong this partitions will share data shuffle >> load >> so more the partitions less data to hold. Please guide I am new to Spark. >> I >> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process >> using hiveContext.sql() group by queries. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: What should be the optimal value for spark.sql.shuffle.partition?
Do you have any details about the cluster you are running this against? The memory per executor/node, number of executors, and such? Even at a shuffle setting of 1000 that would be roughly 1GB per partition assuming the 1TB of data includes overheads in the JVM. Maybe try another order of magnitude higher for number of shuffle partitions and see where that gets you? On Tue, Sep 1, 2015 at 12:11 PM, unk1102 <umesh.ka...@gmail.com> wrote: > Hi I am using Spark SQL actually hiveContext.sql() which uses group by > queries and I am running into OOM issues. So thinking of increasing value > of > spark.sql.shuffle.partition from 200 default to 1000 but it is not helping. > Please correct me if I am wrong this partitions will share data shuffle > load > so more the partitions less data to hold. Please guide I am new to Spark. I > am using Spark 1.4.0 and I have around 1TB of uncompressed data to process > using hiveContext.sql() group by queries. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: spark.shuffle.spill=false ignored?
Hi Eric, I just wanted to do a sanity check, do you know what paths it is trying to write to? I ask because even without spilling, shuffles always write to disk first before transferring data across the network. I had at one point encountered this myself where we accidentally had /tmp mounted on a tiny disk and kept running out of disk on shuffles even though we also don't spill. You may have already considered or ruled this out though. On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker <eric.wal...@gmail.com> wrote: > Hi, > > I am using Spark 1.3.1 on EMR with lots of memory. I have attempted to > run a large pyspark job several times, specifying > `spark.shuffle.spill=false` in different ways. It seems that the setting > is ignored, at least partially, and some of the tasks start spilling large > amounts of data to disk. The job has been fast enough in the past, but > once it starts spilling to disk it lands on Miller's planet [1]. > > Is this expected behavior? Is it a misconfiguration on my part, e.g., > could there be an incompatible setting that is overriding > `spark.shuffle.spill=false`? Is it something that goes back to Spark > 1.3.1? Is it something that goes back to EMR? When I've allowed the job > to continue on for a while, I've started to see Kryo stack traces in the > tasks that are spilling to disk. The stack traces mention there not being > enough disk space, although a `df` shows plenty of space (perhaps after the > fact, when temporary files have been cleaned up). > > Has anyone run into something like this before? I would be happy to see > OOM errors, because that would be consistent with one understanding of what > might be going on, but I haven't yet. > > Eric > > > [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?
Hi, what is the reasoning behind the use of `coalesce(1,false)`? This is saying to aggregate all data into a single partition, which must fit in memory on one node in the Spark cluster. If the cluster has more than one node it must shuffle to move the data. It doesn't seem like the following map or union necessitate coalesce, but the use case is not clear to me. On Fri, Sep 4, 2015 at 12:29 PM, unk1102 <umesh.ka...@gmail.com> wrote: > Hi I have Spark job which does some processing on ORC data and stores back > ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have > the following piece of code which is using heavy shuffle memory. How do I > optimize below code? Is there anything wrong with it? It is working fine as > expected only causing slowness because of GC pause and shuffles lots of > data > so hitting memory issues. Please guide I am new to Spark. Thanks in > advance. > > JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, > false).map(new Function<Row, Row>() { >@Override >public Row call(Row row) throws Exception { > List rowAsList; > Row row1 = null; > if (row != null) { > rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); > row1 = RowFactory.create(rowAsList.toArray()); > } > return row1; >} > }).union(modifiedRDD); > DataFrame updatedDataFrame = > hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema()); > > updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", > "date").save("baseTable"); > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: New to Spark - Paritioning Question
That seems like it could work, although I don't think `partitionByKey` is a thing, at least for RDD. You might be able to merge step #2 and step #3 into one step by using the `reduceByKey` function signature that takes in a Partitioner implementation. def reduceByKey(partitioner: Partitioner <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html> , func: (V, V) ⇒ V): RDD <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html> [(K, V)] Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. The tricky part might be getting the partitioner to know about the number of partitions, which I think it needs to know upfront in `abstract def numPartitions: Int`. The `HashPartitioner` for example takes in the number as a constructor argument, maybe you could use that with an upper bound size if you don't mind empty partitions. Otherwise you might have to mess around to extract the exact number of keys if it's not readily available. Aside: what is the requirement to have each partition only contain the data related to one key? On Fri, Sep 4, 2015 at 11:06 AM, mmike87 <mwri...@snl.com> wrote: > Hello, I am new to Apache Spark and this is my company's first Spark > project. > Essentially, we are calculating models dealing with Mining data using > Spark. > > I am holding all the source data in a persisted RDD that we will refresh > periodically. When a "scenario" is passed to the Spark job (we're using Job > Server) the persisted RDD is filtered to the relevant mines. For example, > we > may want all mines in Chile and the 1990-2015 data for each. > > Many of the calculations are cumulative, that is when we apply user-input > "adjustment factors" to a value, we also need the "flexed" value we > calculated for that mine previously. > > To ensure that this works, the idea if to: > > 1) Filter the superset to relevant mines (done) > 2) Group the subset by the unique identifier for the mine. So, a group may > be all the rows for mine "A" for 1990-2015 > 3) I then want to ensure that the RDD is partitioned by the Mine Identifier > (and Integer). > > It's step 3 that is confusing me. I suspect it's very easy ... do I simply > use PartitionByKey? > > We're using Java if that makes any difference. > > Thanks! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > ----- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Re: Removing empty partitions before we write to HDFS
Not that I'm aware of. We ran into the similar issue where we didn't want to keep accumulating all these empty part files in storage on S3 or HDFS. There didn't seem to be any performance free way to do it with an RDD, so we just run a non-spark post-batch operation to delete empty files from the write path. On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com wrote: Currently, I use rdd.isEmpty() Thanks, Patanachai On 08/06/2015 12:02 PM, gpatcham wrote: Is there a way to filter out empty partitions before I write to HDFS other than using reparition and colasce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Does RDD.cartesian involve shuffling?
Yes it does, in fact it's probably going to be one of the more expensive shuffles you could trigger. On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: Does RDD.cartesian involve shuffling? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Repartition question
Hi, it is possible to control the number of partitions for the RDD without calling repartition by setting the max split size for the hadoop input format used. Tracing through the code, XmlInputFormat extends FileInputFormat which determines the number of splits (which NewHadoopRdd uses to determine number of partitions: https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95) with a few configs: https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200 . public static final String SPLIT_MAXSIZE = mapreduce.input.fileinputformat.split.maxsize; public static final String SPLIT_MINSIZE = mapreduce.input.fileinputformat.split.minsize; If you are setting SparkConf fields, prefix the keys with spark.hadoop and they will end up on the Hadoop conf used for the above values. On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi All, I am running the WikiPedia parsing example present in the Advance Analytics with Spark book. https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112 The partitions of the RDD returned by the readFile function (mentioned above) is of 32MB size. So if my file size is 100 MB, RDD is getting created with 4 partitions with approx 32MB size. I am running this in a standalone spark cluster mode, every thing is working fine only little confused about the nbr of partitions and the size. I want to increase the nbr of partitions for the RDD to make use of the cluster. Is calling repartition() after this the only option or can I pass something in the above method to have more partitions of the RDD. Please let me know. Thanks. -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Does RDD.cartesian involve shuffling?
That is the only alternative I'm aware of, if either A or B are small enough to broadcast then you'd at least be done cartesian products all locally without needing to also transmit and shuffle A. Unless spark somehow optimizes cartesian product and only transfers the smaller RDD across the network in the shuffle but I don't have reason to believe that's true. I'd try the cartesian first if you haven't tried at all, just to make sure it actually is too slow before getting tricky with the broadcast. On Tue, Aug 4, 2015 at 12:25 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: Thanks, Richard! I basically have two RDD's: A and B; and I need to compute a value for every pair of (a, b) for a in A and b in B. My first thought is cartesian, but involves expensive shuffle. Any alternatives? How about I convert B to an array and broadcast it to every node (assuming B is relative small to fit)? On Tue, Aug 4, 2015 at 8:23 AM, Richard Marscher rmarsc...@localytics.com wrote: Yes it does, in fact it's probably going to be one of the more expensive shuffles you could trigger. On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu rotationsymmetr...@gmail.com wrote: Does RDD.cartesian involve shuffling? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Richard Marscher Software Engineer Localytics Localytics.com | Our Blog | Twitter | Facebook | LinkedIn -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: How to increase parallelism of a Spark cluster?
) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: - Version: HttpVersion.HTTP_1_1 - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET - NoTcpDelay: true - SocketBufferSize: 8192 - UserAgent: Apache-HttpClient/release (java 1.5) In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time. @Abhishek: my observations agree with what you said. In the past I have had success with repartition to reduce the partition size especially when groupBy operations were involved. But I believe an executor should be able to handle multiple tasks in parallel from what I understand about Akka on which Spark is built - the worker is essentially an ActorSystem which can contain multiple Actors, each actor works on a queue of tasks. Within an Actor everything is sequential, but the ActorSystem is responsible for farming out tasks it gets to each of its Actors. Although it is possible I could be generalizing incorrectly from my limited experience with Akka. Thanks again for all your help. Please let me know if something jumps out and/or if there is some configuration I should check. -sujit On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: I don't know if (your assertion/expectation that) workers will process things (multiple partitions) in parallel is really valid. Or if having more partitions than workers will necessarily help (unless you are memory bound - so partitions is essentially helping your work size rather than execution parallelism). [Disclaimer: I am no authority on Spark, but wanted to throw my spin based my own understanding]. Nothing official about it :) -abhishek- On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so. Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong? Thank you in advance for any pointers you can provide. -sujit -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: user threads in executors
You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Apache Spark : Custom function for reduceByKey - missing arguments for method
Did you try it by adding the `_` after the method names to partially apply them? Scala is saying that its trying to immediately apply those methods but can't find arguments. But you instead are trying to pass them along as functions (which they aren't). Here is a link to a stackoverflow answer that should help clarify: http://stackoverflow.com/a/19720808/72401. I think there are two solutions, turn the getMax and getMin into functions by using val ala: val getMax: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } val getMin: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } or just partially apply them: maxVector = attribMap.reduceByKey( getMax _) minVector = attribMap.reduceByKey( getMin _) On Thu, Jul 9, 2015 at 9:09 PM, ameyamm ameya.malond...@outlook.com wrote: I am trying to normalize a dataset (convert values for all attributes in the vector to 0-1 range). I created an RDD of tuple (attrib-name, attrib-value) for all the records in the dataset as follows: val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( contact = { List( (dage,contact.dage match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry1,contact.dancstry1 match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry2,contact.dancstry2 match { case Some(value) = DoubleDimension(value) ; case None = null }), (ddepart,contact.ddepart match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhispanic,contact.dhispanic match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhour89,contact.dhour89 match { case Some(value) = DoubleDimension(value) ; case None = null }) ) } ) Here, contactDataset is of the type RDD[Contact]. The fields of Contact class are of type Option[Long]. DoubleDimension is a simple wrapper over Double datatype. It extends the Ordered trait and implements corresponding compare method and equals method. To obtain the max and min attribute vector for computing the normalized values, maxVector = attribMap.reduceByKey( getMax ) minVector = attribMap.reduceByKey( getMin ) Implementation of getMax and getMin is as follows: def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = { if (a b) a else b } def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = { if (a b) a else b } I get a compile error at calls to the methods getMax and getMin stating: [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error: missing arguments for method getMax in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] maxVector = attribMap.reduceByKey( getMax ) [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error: missing arguments for method getMin in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] minVector = attribMap.reduceByKey( getMin ) I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as per my knowledge, I can pass any method to it as long as the functions is of the type f : (V, V) = V. I am really stuck here. Please help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Unit tests of spark application
Unless you had something specific in mind, it should be as simple as creating a SparkContext object using a master of local[2] in your tests On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi, I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. Thank you very much. -Naveen -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Spark serialization in closure
Reading that article and applying it to your observations of what happens at runtime: shouldn't the closure require serializing testing? The foo singleton object is a member of testing, and then you call this foo value in the closure func and further in the foreachPartition closure. So following by that article, Scala will attempt to serialize the containing object/class testing to get the foo instance. On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote: Repost the code example, object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote: Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song -- Chen Song -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Create RDD from output of unix command
As a distributed data processing engine, Spark should be fine with millions of lines. It's built with the idea of massive data sets in mind. Do you have more details on how you anticipate the output of a unix command interacting with a running Spark application? Do you expect Spark to be continuously running and somehow observe unix command outputs? Or are you thinking more along the lines of running a unix command with output and then taking whatever format that is and running a spark job against it? If it's the latter, it should be as simple as writing the command output to a file and then loading the file into an RDD in Spark. On Wed, Jul 8, 2015 at 2:02 PM, foobar heath...@fb.com wrote: What's the best practice of creating RDD from some external unix command output? I assume if the output size is large (say millions of lines), creating RDD from an array of all lines is not a good idea? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Best practice for using singletons on workers (seems unanswered) ?
Ah, I see this is streaming. I haven't any practical experience with that side of Spark. But the foreachPartition idea is a good approach. I've used that pattern extensively, even though not for singletons, but just to create non-serializable objects like API and DB clients on the executor side. I think it's the most straightforward approach to dealing with any non-serializable object you need. I don't entirely follow what over-network data shuffling effects you are alluding to (maybe more specific to streaming?). On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: My singletons do in fact stick around. They're one per worker, looks like. So with 4 workers running on the box, we're creating one singleton per worker process/jvm, which seems OK. Still curious about foreachPartition vs. foreachRDD though... On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher rmarsc...@localytics.com wrote: Would it be possible to have a wrapper class that just represents a reference to a singleton holding the 3rd party object? It could proxy over calls to the singleton object which will instantiate a private instance of the 3rd party object lazily? I think something like this might work if the workers have the singleton object in their classpath. here's a rough sketch of what I was thinking: object ThirdPartySingleton { private lazy val thirdPartyObj = ... def someProxyFunction() = thirdPartyObj.() } class ThirdPartyReference extends Serializable { def someProxyFunction() = ThirdPartySingleton.someProxyFunction() } also found this SO post: http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I am seeing a lot of posts on singletons vs. broadcast variables, such as * http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html * http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219 What's the best approach to instantiate an object once and have it be reused by the worker(s). E.g. I have an object that loads some static state such as e.g. a dictionary/map, is a part of 3rd party API and is not serializable. I can't seem to get it to be a singleton on the worker side as the JVM appears to be wiped on every request so I get a new instance. So the singleton doesn't stick. Is there an approach where I could have this object or a wrapper of it be a broadcast var? Can Kryo get me there? would that basically mean writing a custom serializer? However, the 3rd party object may have a bunch of member vars hanging off it, so serializing it properly may be non-trivial... Any pointers/hints greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best practice for using singletons on workers (seems unanswered) ?
Would it be possible to have a wrapper class that just represents a reference to a singleton holding the 3rd party object? It could proxy over calls to the singleton object which will instantiate a private instance of the 3rd party object lazily? I think something like this might work if the workers have the singleton object in their classpath. here's a rough sketch of what I was thinking: object ThirdPartySingleton { private lazy val thirdPartyObj = ... def someProxyFunction() = thirdPartyObj.() } class ThirdPartyReference extends Serializable { def someProxyFunction() = ThirdPartySingleton.someProxyFunction() } also found this SO post: http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I am seeing a lot of posts on singletons vs. broadcast variables, such as * http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html * http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219 What's the best approach to instantiate an object once and have it be reused by the worker(s). E.g. I have an object that loads some static state such as e.g. a dictionary/map, is a part of 3rd party API and is not serializable. I can't seem to get it to be a singleton on the worker side as the JVM appears to be wiped on every request so I get a new instance. So the singleton doesn't stick. Is there an approach where I could have this object or a wrapper of it be a broadcast var? Can Kryo get me there? would that basically mean writing a custom serializer? However, the 3rd party object may have a bunch of member vars hanging off it, so serializing it properly may be non-trivial... Any pointers/hints greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create empty RDD
This should work val output: RDD[(DetailInputRecord, VISummary)] = sc.paralellize(Seq.empty[(DetailInputRecord, VISummary)]) On Mon, Jul 6, 2015 at 5:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I need to return an empty RDD of type val output: RDD[(DetailInputRecord, VISummary)] This does not work val output: RDD[(DetailInputRecord, VISummary)] = new RDD() as RDD is abstract class. How do i create empty RDD ? -- Deepak
Re: Spark driver hangs on start of job
Ah I see, glad that simple patch works for your problem. That seems to be a different underlying problem than we have been experiencing. In our case, the executors are failing properly, its just that none of the new ones will ever escape experiencing the same exact issue. So we start a death spiral of thousands of failed executors, all of which can't connect with the driver. Meanwhile, the driver just sits there in the zombie state doing nothing while it waits for executors to respond. In that light, my solution is geared towards solving things on the driver-side gracefully. On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder sjoerdmul...@gmail.com wrote: Hi Richard, I have actually applied the following fix to our 1.4.0 version and this seem to resolve the zombies :) https://github.com/apache/spark/pull/7077/files Sjoerd 2015-06-26 20:08 GMT+02:00 Richard Marscher rmarsc...@localytics.com: Hi, we are on 1.3.1 right now so in case there are differences in the Spark files I'll walk through the logic of what we did and post a couple gists at the end. We haven't committed to forking Spark for our own deployments yet, so right now we shadow some Spark classes in our application code with our versions of the classes. Keep in mind I am not a Spark committer so the following is a best effort basis that is working for us. But it may be that someone more knowledgable about the Spark codebase might see a pitfall to my solution or a better solution. -- First, we'll start with the root issue in TaskSchedulerImpl. You will find the code that prints the Initial job has not accepted any resources warning inside the submitTasks function. Spark creates a separate thread that checks some conditions every STARVATION_TIMEOUT milliseconds until the submitted task set has been launched. It only posts the warn logging here and does nothing. I will come back to this part of the code in a moment. The code that determines when the hasLaunchedTask flag gets set (and thus closes out the starvation thread and the task set is being worked on by the cluster) is within the resourceOffers function. The various Spark Scheduler Backend classes will periodically call this function in TaskSchedulerImpl until cluster resources have been assigned to the task set. To start signaling the zombied scenario, I created a new flag: @volatile private var hasZombied = false. In our experience we always get the resources in resourceOffer before the starvation thread runs, otherwise we have always hit the zombie scenario if resources weren't allocated yet. So I added a conditional before the if(tasks.size 0) { hasLaunchedTask = true } block. The conditional checks if(!hasLaunchedTask hasZombied) { dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in a moment. The last detail here is to add code inside the starvation thread block after it posts the warning log. Set hasZombied to true and then call this.cancel() to stop the starvation thread from continuing to run. With this we now have all the steps needed inside TaskSchedulerImpl to start signaling out the zombied condition. Back to the custom function. DAGScheduler has reference to the appropriate Spark listeners that can propagate errors to the task set and more importantly back to your application code. If you look at DAGScheduler class, you will find a function called cleanUpAfterSchedulerStop(). This function does everything we want, except it is hard coded to a specific exception val error = new SparkException(...). What I did was copy this and made another function that returned a custom Exception I created that I use to signal the zombie, something like SparkTaskResourceAllocationZombieError. Now you call this function within the conditional block in TaskSchedulerImpl.resourceOffers and you should see your exception propagating out to your application code so you can take appropriate actions. In our case, we are submitting Spark applications programmatically from a Scala application service on an EC2 instance to a Spark Standalone cluster in EC2. Whenever we see this error, the application service EC2 instance is unable to get resources from the cluster even when attempting subsequent Spark applications for a long period of time (it eventually recovers hours or days later but that is not useful for us). So in our case we need to reschedule the failed Spark application on another EC2 application instance and shut down this current EC2 instance because it can no longer get cluster resources. Your use case may be different, but at least action can be taken at an application level. Here is some source code, you should be able to locate most of my additions to the code by searching for comments starting with // Localytics Code TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7 Regards, Richard
Re: Applying functions over certain count of tuples .
Hi, not sure what the context is but I think you can do something similar with mapPartitions: rdd.mapPartitions { iterator = iterator.grouped(5).map { tupleGroup = emitOneRddForGroup(tupleGroup) } } The edge case is when the final grouping doesn't have exactly 5 items, if that matters. On Mon, Jun 29, 2015 at 3:57 PM, anshu shukla anshushuk...@gmail.com wrote: I want to apply some logic on the basis of a FIX count of number of tuples in each RDD . *suppose emit one rdd for every 5 tuple of previous RDD . * -- Thanks Regards, Anshu Shukla
Re: Spark driver hangs on start of job
We've seen this issue as well in production. We also aren't sure what causes it, but have just recently shaded some of the Spark code in TaskSchedulerImpl that we use to effectively bubble up an exception from Spark instead of zombie in this situation. If you are interested I can go into more detail about that. Otherwise I'm also keen to find out more on how this might be happening. On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com wrote: Hi, I have a really annoying issue that i cannot replicate consistently, still it happens every +- 100 submissions. (it's a job that's running every 3 minutes). Already reported an issue for this: https://issues.apache.org/jira/browse/SPARK-8592 Here are the Thread dump of the Driver and the Executor: https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ Any direction is should look into? Spark 1.4.0 Java 1.8.0_45 (Oracle Corporation) Scala 2.11.6 I already tried to resolve the NPE by not logging the ActorRef. This makes the NPE go away :) But the root cause lies deeper I expect, since then the driver then still hangs with the *WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources* messages. But there are enough resources available in the cluster, it has plenty of CPU and Memory left. Logs from Driver: 15/06/26 11:58:19 INFO Remoting: Starting remoting 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.123:51415] 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver' on port 51415. 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file server' on port 33176. 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at http://172.17.0.123:4040 15/06/26 11:58:20 INFO SparkContext: Added JAR file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp 1435319900257 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@172.17.42.1:7077/user/Master... 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150626115820-0917 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added: app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 ( 10.0.7.171:47050) with 1 cores 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0 GB RAM 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution thread 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: app-20150626115820-0917/0 is now LOADING 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: app-20150626115820-0917/0 is now RUNNING 15/06/26 11:58:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000. 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver, 172.17.0.123, 52000) 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. 15/06/26 11:58:24 INFO SparkContext: Starting job: map at SparkProductEventAggregator.scala:144 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1 [5cc3f53084] 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from [675d42c8-9823-4d3c-8e86-5aa611d38770/events] 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for [675d42c8-9823-4d3c-8e86-5aa611d38770/events] 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at SparkProductEventAggregator.scala:144)
Re: Spark driver hangs on start of job
Hi, we are on 1.3.1 right now so in case there are differences in the Spark files I'll walk through the logic of what we did and post a couple gists at the end. We haven't committed to forking Spark for our own deployments yet, so right now we shadow some Spark classes in our application code with our versions of the classes. Keep in mind I am not a Spark committer so the following is a best effort basis that is working for us. But it may be that someone more knowledgable about the Spark codebase might see a pitfall to my solution or a better solution. -- First, we'll start with the root issue in TaskSchedulerImpl. You will find the code that prints the Initial job has not accepted any resources warning inside the submitTasks function. Spark creates a separate thread that checks some conditions every STARVATION_TIMEOUT milliseconds until the submitted task set has been launched. It only posts the warn logging here and does nothing. I will come back to this part of the code in a moment. The code that determines when the hasLaunchedTask flag gets set (and thus closes out the starvation thread and the task set is being worked on by the cluster) is within the resourceOffers function. The various Spark Scheduler Backend classes will periodically call this function in TaskSchedulerImpl until cluster resources have been assigned to the task set. To start signaling the zombied scenario, I created a new flag: @volatile private var hasZombied = false. In our experience we always get the resources in resourceOffer before the starvation thread runs, otherwise we have always hit the zombie scenario if resources weren't allocated yet. So I added a conditional before the if(tasks.size 0) { hasLaunchedTask = true } block. The conditional checks if(!hasLaunchedTask hasZombied) { dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in a moment. The last detail here is to add code inside the starvation thread block after it posts the warning log. Set hasZombied to true and then call this.cancel() to stop the starvation thread from continuing to run. With this we now have all the steps needed inside TaskSchedulerImpl to start signaling out the zombied condition. Back to the custom function. DAGScheduler has reference to the appropriate Spark listeners that can propagate errors to the task set and more importantly back to your application code. If you look at DAGScheduler class, you will find a function called cleanUpAfterSchedulerStop(). This function does everything we want, except it is hard coded to a specific exception val error = new SparkException(...). What I did was copy this and made another function that returned a custom Exception I created that I use to signal the zombie, something like SparkTaskResourceAllocationZombieError. Now you call this function within the conditional block in TaskSchedulerImpl.resourceOffers and you should see your exception propagating out to your application code so you can take appropriate actions. In our case, we are submitting Spark applications programmatically from a Scala application service on an EC2 instance to a Spark Standalone cluster in EC2. Whenever we see this error, the application service EC2 instance is unable to get resources from the cluster even when attempting subsequent Spark applications for a long period of time (it eventually recovers hours or days later but that is not useful for us). So in our case we need to reschedule the failed Spark application on another EC2 application instance and shut down this current EC2 instance because it can no longer get cluster resources. Your use case may be different, but at least action can be taken at an application level. Here is some source code, you should be able to locate most of my additions to the code by searching for comments starting with // Localytics Code TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7 Regards, Richard On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder sjoerdmul...@gmail.com wrote: Hi Richard, I would like to see how we can get a workaround to get out of the Zombie situation since were planning for production :) If you could share the workaround or point directions that would be great! Sjoerd 2015-06-26 16:53 GMT+02:00 Richard Marscher rmarsc...@localytics.com: We've seen this issue as well in production. We also aren't sure what causes it, but have just recently shaded some of the Spark code in TaskSchedulerImpl that we use to effectively bubble up an exception from Spark instead of zombie in this situation. If you are interested I can go into more detail about that. Otherwise I'm also keen to find out more on how this might be happening. On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com wrote: Hi, I have a really annoying issue that i cannot replicate consistently, still it happens every +- 100 submissions. (it's a job
Re: Limitations using SparkContext
Hi, can you detail the symptom further? Was it that only 12 requests were services and the other 440 timed out? I don't think that Spark is well suited for this kind of workload, or at least the way it is being represented. How long does a single request take Spark to complete? Even with fair scheduling, you will only be able to have a fixed amount of tasks running on Spark at once. Usually this is bounded by the max cores setting in configuration. Since you mention local as a comparison point I get the impression you are running Spark Standalone for cluster. The implication, if this is reflective of your current setup, is that you aren't going to get much concurrency for separate spray requests. lets say your max cores is 16 and your number of tasks/partitions per stage of your spark DAG is 8. Then at any given time only 2 requests can be serviced. It may also be the case that with fair scheduling that a single request gets pre-empted after completing one stage of the DAG and has to wait to continue instead of proceeding directly to the next stage. This hypothesis would also support the observation that local is no better than cluster, because you probably have even less concurrent spark tasks available on the single local machine. spark.cores.max(not set)When running on a standalone deploy cluster https://spark.apache.org/docs/1.3.1/spark-standalone.html or a Mesos cluster in coarse-grained sharing mode https://spark.apache.org/docs/1.3.1/running-on-mesos.html#mesos-run-modes, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos. On Tue, Jun 23, 2015 at 12:44 PM, daunnc dau...@gmail.com wrote: So the situation is following: got a spray server, with a spark context available (fair scheduling in a cluster mode, via spark-submit). There are some http urls, which calling spark rdd, and collecting information from accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of limitation, on requests: wrk -t8 -c50 -d30s http://localhost:/…/; Running 30s test @ http://localhost:/…/ 8 threads and 50 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.03s 523.30ms 1.70s50.00% Req/Sec 6.05 5.4920.00 71.58% 452 requests in 30.04s, 234.39KB read Socket errors: connect 0, read 0, write 0, timeout 440 So this happens on making some calls with spark rdd (not depends on called function), and in browser you can see ERR_EMPTY_RESPONSE Now the solution was to use cache, but want to know about this limitations, or mb some settings. This error happens in local mode and in cluster mode, so guess not depends on it. P.S. logs are clear (or simply don't know where to look, but stdout of a spar-submit in a client mode is clear). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How Spark Execute chaining vs no chaining statements
There should be no difference assuming you don't use the intermediately stored rdd values you are creating for anything else (rdd1, rdd2). In the first example it still is creating these intermediate rdd objects you are just using them implicitly and not storing the value. It's also worth pointing out that Spark is able to pipeline operations together into stages. That is, it should effectively translate something like like map(f1).map(f2).map(f3) to map(f1 - f2 - f3) in pseudcode, if you will. Here is a more detailed explanation from one of the committer's on SO: http://stackoverflow.com/questions/19340808/spark-single-pipelined-scala-command-better-than-separate-commands On Tue, Jun 23, 2015 at 5:17 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , What is difference between below in terms of execution to the cluster with 1 or more worker node rdd.map(...).map(...)...map(..) vs val rdd1 = rdd.map(...) val rdd2 = rdd1.map(...) val rdd3 = rdd2.map(...) Thanks, Ashish
Re: Multiple executors writing file using java filewriter
Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Executor memory allocations
It would be the 40%, although it's probably better to think of it as shuffle vs. data cache and the remainder goes to tasks. As the comments for the shuffle memory fraction configuration clarify that it will be taking memory at the expense of the storage/data cache fraction: spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation and cogroups during shuffles, ifspark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet cjno...@gmail.com wrote: So I've seen in the documentation that (after the overhead memory is subtracted), the memory allocations of each executor are as follows (assume default settings): 60% for cache 40% for tasks to process data Reading about how Spark implements shuffling, I've also seen it say 20% of executor memory is utilized for shuffles Does this 20% cut into the 40% for tasks to process data or the 60% for the data cache?
Re: append file on hdfs
Hi, if you now want to write 1 file per partition, that's actually built into Spark as *saveAsTextFile*(*path*)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, i have an idea to solve my problem, i want write one file for each spark partion, but i not know to get the actuel partion suffix/ID in my call function? points.foreachPartition( new VoidFunctionIteratorTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = -7210897529331503565L; public void call(IteratorTuple2Integer, GeoTimeDataTupel entry)throws Exception { while(entry.hasNext()) { Tuple2Integer, GeoTimeDataTupel temp = entry.next(); try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); } catch(Exception e) { e.printStackTrace(); } } } } ); 2015-06-09 15:34 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi community, i want append results to one file. if i work local my function build all right, if i run this on a yarn cluster, i lost same rows. here my function to write: points.foreach( new VoidFunctionTuple2Integer, GeoTimeDataTupel() { private static final long serialVersionUID = 2459995649387229261L; public void call(Tuple2Integer, GeoTimeDataTupel entry)throws Exception { try { FileSystem fs = FileSystem.get(new URI(pro.getProperty(hdfs.namenode)),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results); if(fs.exists(pt)) { FSDataInputStream in = fs.open(pt); Path pt_temp = new Path(fs.getHomeDirectory()+pro.getProperty(spark.output)+/results_temp); backup(fs.getConf(), fs, in, pt_temp); in.close(); FSDataOutputStream out = fs.create((pt), true); FSDataInputStream backup = fs.open(pt_temp); int offset = 0; int bufferSize = 4096; int result = 0; byte[] buffer = new byte[bufferSize]; // pre read a part of content from input stream result = backup.read(offset, buffer, 0, bufferSize); // loop read input stream until it does not fill whole size of buffer while (result == bufferSize) { out.write(buffer); // read next segment from input stream by moving the offset pointer offset += bufferSize; result = backup.read(offset, buffer, 0, bufferSize); } if (result 0 result bufferSize) { for (int i = 0; i result; i++) { out.write(buffer[i]); } } out.writeBytes(Cluster: +entry._1+, Point: +entry._2.toString()+\n); out.close(); } else { BufferedWriter bw =new BufferedWriter(new OutputStreamWriter(fs.create(pt))); bw.write(Cluster: +entry._1+, Point: +entry._2.toString()+\n); bw.close(); } } catch (Exception e) { e.printStackTrace(); } } public void backup(Configuration conf, FileSystem fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { FSDataOutputStream out = fs.create(pt_temp, true); IOUtils.copyBytes(sourceContent, out, 4096, false); out.close(); } where is my fault?? or give it a function to write(append) to the hadoop hdfs? best regards, paul
Re: spark eventLog and history server
Hi, I don't have a complete answer to your questions but: Removing the suffix does not solve the problem - unfortunately this is true, the master web UI only tries to build out a Spark UI from the event logs once, at the time the context is closed. If the event logs are in-progress at this time, then you basically missed the opportunity. Does it mean I don't need to start history server if I only use spark in standalone mode? - Yes, you don't need to start the history server. On Mon, Jun 8, 2015 at 7:57 PM, Du Li l...@yahoo-inc.com.invalid wrote: Event log is enabled in my spark streaming app. My code runs in standalone mode and the spark version is 1.3.1. I periodically stop and restart the streaming context by calling ssc.stop(). However, from the web UI, when clicking on a past job, it says the job is still in progress and does not show the event log. The event log files have suffix .inprogress. Removing the suffix does not solve the problem. Do I need to do anything here in order to view the event logs of finished jobs? Or do I need to stop ssc differently? In addition, the documentation seems to suggest history server is used for Mesos or YARN mode. Does it mean I don't need to start history server if I only use spark in standalone mode? Thanks, Du
FileOutputCommitter deadlock 1.3.1
Hi, we've been seeing occasional issues in production with the FileOutCommitter reaching a deadlock situation. We are writing our data to S3 and currently have speculation enabled. What we see is that Spark get's a file not found error trying to access a temporary part file that it wrote (part-#2 file it seems to be every time?), so the task fails. But the file actually exists in S3 so subsequent speculations and task retries all fail because the committer tells them the file exists. This will persist until human intervention kills the application. Usually rerunning the application will succeed on the next try so it is not deterministic with the dataset or anything. It seems like there isn't a good story yet for file writing and speculation (https://issues.apache.org/jira/browse/SPARK-4879), although our error here seems worse that reports in that issue since I believe ours deadlocks and those don't? Has anyone else observed deadlocking like this? Thanks, Richard
Re: Deduping events using Spark
I think if you create a bidirectional mapping from AnalyticsEvent to another type that would wrap it and use the nonce as its equality, you could then do something like reduceByKey to group by nonce and map back to AnalyticsEvent after. On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote: I'm still a bit new to Spark and am struggilng to figure out the best way to Dedupe my events. I load my Avro files from HDFS and then I want to dedupe events that have the same nonce. For example my code so far: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) Now I want to get back an RDD of AnalyticsEvents that are unique. So I basically want to do: if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of them. I'm not sure how to do this? If I do reduceByKey it reduces by AnalyticsEvent not by the values inside? Any guidance would be much appreciated how I can walk this list of events and only return a filtered version of unique nocnes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scaling spark jobs returning large amount of data
It is possible to start multiple concurrent drivers, Spark dynamically allocates ports per spark application on driver, master, and workers from a port range. When you collect results back to the driver, they do not go through the master. The master is mostly there as a coordinator between the driver and the cluster of worker nodes, but otherwise the workers and driver communicate directly for the underlying workload. A spark application relates to one instance of a SparkContext programmatically or to one call to one of the spark submit scripts. Assuming you don't have dynamic resource allocation setup, each application takes a fixed amount of the cluster resources to run. So as long as you subdivide your cluster resources properly you can run multiple concurrent applications against it. We are doing this in production presently. Alternately, as Igor suggests, you can share a spark application and launch different jobs within it. They will share the resources allocated to the application in this case. An effect of this is you will only have a finite amount of concurrent spark tasks (roughly translates to 1 task can execute 1 partition of a job at a time). If you launch multiple independent jobs within the same application you will likely want to enable fair job scheduling, otherwise stages between independent jobs will run in a FIFO order instead of interleaving execution. Hope this helps, Richard On Thu, Jun 4, 2015 at 11:20 AM, Igor Berman igor.ber...@gmail.com wrote: Hi, as far as I understand you shouldn't send data to driver. Suppose you have file in hdfs/s3 or cassandra partitioning, you should create your job such that every executor/worker of spark will handle part of your input, transform, filter it and at the end write back to cassandra as output(once again every executor/core inside worker will write part of the output, in your case they will write part of report) In general I find that submitting multiple jobs in same spark context(aka driver) is more performant(you don't pay startup-shutdown time), for this some use rest server for submitting jobs to long running spark context(driver) I'm not sure you can run multiple concurrent drivers because of ports On 4 June 2015 at 17:30, Giuseppe Sarno giuseppesa...@fico.com wrote: Hello, I am relatively new to spark and I am currently trying to understand how to scale large numbers of jobs with spark. I understand that spark architecture is split in “Driver”, “Master” and “Workers”. Master has a standby node in case of failure and workers can scale out. All the examples I have seen show Spark been able to distribute the load to the workers and returning small amount of data to the Driver. In my case I would like to explore the scenario where I need to generate a large report on data stored on Cassandra and understand how Spark architecture will handle this case when multiple report jobs will be running in parallel. According to this presentation https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/ responses from workers go through the Master and finally to the Driver. Does this mean that the Driver and/ or Master is a single point for all the responses coming back from workers ? Is it possible to start multiple concurrent Drivers ? Regards, Giuseppe. Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra) Limited (Co. No. 03295455) are registered in England and Wales and have a registered office address of Cottons Centre, 5th Floor, Hays Lane, London, SE1 2QP. This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: Application is always in process when I check out logs of completed application
I had the same issue a couple days ago. It's a bug in 1.3.0 that is fixed in 1.3.1 and up. https://issues.apache.org/jira/browse/SPARK-6036 The ordering that the event logs are moved from in-progress to complete is coded to be after the Master tries to build the history page for the logs. The only reason it even works on occasion in 1.3.0 is because the Master part is run asynchronously and the event log status change is synchronous, so the Master part on some occasions could be executed afterwards as a race condition. On Wed, Jun 3, 2015 at 2:17 AM, ayan guha guha.a...@gmail.com wrote: Have you done sc.stop() ? :) On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote: I run spark application in spark standalone cluster with client deploy mode. I want to check out the logs of my finished application, but I always get a page telling me Application history not found - Application xxx is still in process. I am pretty sure that the application has indeed completed because I can see it in the Completed Applications list show by Spark WebUI, and I have also found the log file with suffix .inprocessin the directory set by spark.eventLog.dir in my spark-default.conf Oh, BTW, I am using spark 1.3.0 So, is there anything I missed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Client
I think the short answer to the question is, no, there is no alternate API that will not use the System.exit calls. You can craft a workaround like is being suggested in this thread. For comparison, we are doing programmatic submission of applications in a long-running client application. To get around these issues we make a shadowed version of some of the Spark code in our application to remove the System.exit calls so instead exceptions bubble up to our application. On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try this? Create an sbt project like: // Create your context val sconf = new SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077) val sc = new SparkContext(sconf) // Do some computations sc.parallelize(1 to 1).take(10).foreach(println) //Now return the exit status System.exit(Some number) Now, make your workflow manager to trigger *sbt run* on the project instead of using spark-submit. Thanks Best Regards On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi akhil , sorry i may not conveying the question properly . Actually we are looking to Launch a spark job from a long running workflow manager, which invokes spark client via SparkSubmit. Unfortunately the client upon successful completion of the application exits with a System.exit(0) or System.exit(NON_ZERO) when there is a failure. Question is, Is there an alternate api though which a spark application can be launched which can return a exit status back to the caller as opposed to initiating JVM halt. On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri
Re: flatMap output on disk / flatMap memory overhead
Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Event Logging to HDFS on Standalone Cluster In Progress
Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Re: Event Logging to HDFS on Standalone Cluster In Progress
Ah, apologies, I found an existing issue and fix has already gone out for this in 1.3.1 and up: https://issues.apache.org/jira/browse/SPARK-6036. On Mon, Jun 1, 2015 at 3:39 PM, Richard Marscher rmarsc...@localytics.com wrote: It looks like it is possibly a race condition between removing the IN_PROGRESS and building the history UI for the application. `AppClient` sends an `UnregisterApplication(appId)` message to the `Master` actor, which triggers the process to look for the app's eventLogs. If they are suffixed with `.inprogress` then it will not build out the history UI and instead build the error page I've seen. Tying this together, calling SparkContext.stop() has the following block: if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false } _eventLogger.foreach(_.stop()) Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend which has an AppClient. AppClient sends itself a message to stop itself, and like mentioned above, it then sends a message to the Master where it tries to build the history UI. Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is removed in the file-system. It seems like the race condition of the Akka message passing to trigger the Master's building of the history UI may be the only reason the history UI ever gets properly setup in the first place. Because if the ordering of calls were all strict in the SparkContext.stop method then you would expect the Master to always see the event logs as in in progress. Maybe I have missed something in tracing through the code? Is there a reason that the eventLogger cannot be closed before the dagScheduler? Thanks, Richard On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher rmarsc...@localytics.com wrote: Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Re: Event Logging to HDFS on Standalone Cluster In Progress
It looks like it is possibly a race condition between removing the IN_PROGRESS and building the history UI for the application. `AppClient` sends an `UnregisterApplication(appId)` message to the `Master` actor, which triggers the process to look for the app's eventLogs. If they are suffixed with `.inprogress` then it will not build out the history UI and instead build the error page I've seen. Tying this together, calling SparkContext.stop() has the following block: if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false } _eventLogger.foreach(_.stop()) Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend which has an AppClient. AppClient sends itself a message to stop itself, and like mentioned above, it then sends a message to the Master where it tries to build the history UI. Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is removed in the file-system. It seems like the race condition of the Akka message passing to trigger the Master's building of the history UI may be the only reason the history UI ever gets properly setup in the first place. Because if the ordering of calls were all strict in the SparkContext.stop method then you would expect the Master to always see the event logs as in in progress. Maybe I have missed something in tracing through the code? Is there a reason that the eventLogger cannot be closed before the dagScheduler? Thanks, Richard On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher rmarsc...@localytics.com wrote: Hi, In Spark 1.3.0 I've enabled event logging to write to an existing HDFS folder on a Standalone cluster. This is generally working, all the logs are being written. However, from the Master Web UI, the vast majority of completed applications are labeled as not having a history: http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.title=Application%20history%20not%20found%20(app-20150601160846-1914) The log does exists though: # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914 -rw-rw 3 user group1027848 2015-06-01 16:09 /eventLogs/app-20150601160846-1914 and `cat` the file ends with: {Event:SparkListenerApplicationEnd,Timestamp:1433174955077} This seems to indicate it saw and logged the application end. Is there a known issue here or a workaround? Looking at the source code I might have expected these files to end in `.inprogress` given the UI error message, but they don't. Thanks, Richard
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
It's not a Spark Streaming app, so sorry I'm not sure of the answer to that. I would assume it should work. On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov evo.efti...@isecc.com wrote: Ok thanks a lot for clarifying that – btw was your application a Spark Streaming App – I am also looking for confirmation that FAIR scheduling is supported for Spark Streaming Apps *From:* Richard Marscher [mailto:rmarsc...@localytics.com] *Sent:* Friday, May 15, 2015 7:20 PM *To:* Evo Eftimov *Cc:* Tathagata Das; user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Looking inside the 'mapPartitions' transformation, some confused observations
I believe the issue in b and c is that you call iter.size which actually is going to flush the iterator so the subsequent attempt to put it into a vector will yield 0 items. You could use an ArrayBuilder for example and not need to rely on knowing the size of the iterator. On Mon, May 11, 2015 at 2:26 PM, myasuka myas...@live.com wrote: As we all know, a partition in Spark is actually an Iterator[T]. For some purpose, I want to treat each partition not an Iterator but one whole object. For example, treat Iterator[Int] to a breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve this, however, during the implementation, I observed some confused observations. I use Spark 1.3.0 on 10 executor nodes cluster, below is different attempts: /import breeze.linalg.DenseVector/ val a = sc.parallelize( 1 to 100, 10) val b = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) var ind = 0 while(iter.hasNext){ v(ind) = iter.next ind += 1 } println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) b.count() val c = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) iter.copyToArray(v, 0, 10) println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) c.count() val d = a.mapPartitions(iter ={val v = iter.toArray println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) d.count() I can see the printed output in the executor's stdout, actually only attempt 'd' satisfy my needs, and other attempts only get a zero desevector, which means the variable assignment from iterator to vector did not happen. Hope for explanations for these observations. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0
By default you would expect to find the logs files for master and workers in the relative `logs` directory from the root of the Spark installation on each of the respective nodes in the cluster. On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Ø Can you check your local and remote logs? Where are the log files? I see the following in my Driver program logs as well as the Spark UI failed task page java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 Here is the detailed stack trace. 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Ningjun *From:* Jonathan Coveney [mailto:jcove...@gmail.com] *Sent:* Wednesday, May 06, 2015 5:23 PM *To:* Wang, Ningjun (LNG-NPV) *Cc:* Ted Yu; user@spark.apache.org *Subject:* Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 Can you check your local and remote logs? 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com: This problem happen in Spark 1.3.1. It happen when two jobs are running simultaneously each in its own Spark Context. I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug introduced in Spark 1.3.1? Ningjun *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Wednesday, May 06, 2015 11:32 AM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 Which release of Spark are you using ? Thanks On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I run a job on spark standalone cluster and got the exception below Here is the line of code that cause problem *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid, cattegory, path) * myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*) *val *cats: Array[String] = myRdd.map(t = t._2).distinct().collect() // This line cause the exception 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137) at
Re: Spark Job triggers second attempt
Hi, I think you may want to use this setting?: spark.task.maxFailures4Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. On Thu, May 7, 2015 at 2:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How i can stop Spark to stop triggering second attempt in case the first fails. I do not want to wait for the second attempt to fail again so that i can debug faster. .set(spark.yarn.maxAppAttempts, 0) OR .set(spark.yarn.maxAppAttempts, 1) is not helping. -- Deepak
Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0
I should also add I've recently seen this issue as well when using collect. I believe in my case it was related to heap space on the driver program not being able to handle the returned collection. On Thu, May 7, 2015 at 11:05 AM, Richard Marscher rmarsc...@localytics.com wrote: By default you would expect to find the logs files for master and workers in the relative `logs` directory from the root of the Spark installation on each of the respective nodes in the cluster. On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Ø Can you check your local and remote logs? Where are the log files? I see the following in my Driver program logs as well as the Spark UI failed task page java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 Here is the detailed stack trace. 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Ningjun *From:* Jonathan Coveney [mailto:jcove...@gmail.com] *Sent:* Wednesday, May 06, 2015 5:23 PM *To:* Wang, Ningjun (LNG-NPV) *Cc:* Ted Yu; user@spark.apache.org *Subject:* Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 Can you check your local and remote logs? 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com: This problem happen in Spark 1.3.1. It happen when two jobs are running simultaneously each in its own Spark Context. I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug introduced in Spark 1.3.1? Ningjun *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Wednesday, May 06, 2015 11:32 AM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 Which release of Spark are you using ? Thanks On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I run a job on spark standalone cluster and got the exception below Here is the line of code that cause problem *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid, cattegory, path) * myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*) *val *cats: Array[String] = myRdd.map(t = t._2).distinct().collect() // This line cause the exception 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused
Re: Maximum Core Utilization
Hi, do you have information on how many partitions/tasks the stage/job is running? By default there is 1 core per task, and your number of concurrent tasks may be limiting core utilization. There are a few settings you could play with, assuming your issue is related to the above: spark.default.parallelism spark.cores.max spark.task.cpus On Tue, May 5, 2015 at 3:55 PM, Manu Kaul manohar.k...@gmail.com wrote: Hi All, For a job I am running on Spark with a dataset of say 350,000 lines (not big), I am finding that even though my cluster has a large number of cores available (like 100 cores), the Spark system seems to stop after using just 4 cores and after that the runtime is pretty much a straight line no matter how many more cores are thrown at it. I am wondering if Spark tries to figure out the maximum no. of cores to use based on the size of the dataset? If yes, is there a way to disable this feature and force it to use all the cores available? Thanks, Manu -- The greater danger for most of us lies not in setting our aim too high and falling short; but in setting our aim too low, and achieving our mark. - Michelangelo
Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
In regards to the large GC pauses, assuming you allocated all 100GB of memory per worker you may consider running with less memory on your Worker nodes, or splitting up the available memory on the Worker nodes amongst several worker instances. The JVM's garbage collection starts to become very slow as the memory allocation for the heap becomes large. At 100GB it may not be unusual to see GC take minutes at time. I believe with Yarn or Standalone clusters you should be able to run multiple smaller JVM instances on your workers so you can still use your cluster resources but also won't have a single JVM allocating an unwieldy amount of much memory. On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com wrote: Could you be more specific in how this is done? A DataFrame class doesn't have that method. On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote: You can use custom partitioner to redistribution using partitionby On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote: I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extra stage that executes before triggering computation with an action
I'm not sure, but I wonder if because you are using the Spark REPL that it may not be representing what a normal runtime execution would look like and is possibly eagerly running a partial DAG once you define an operation that would cause a shuffle. What happens if you setup your same set of commands [a-e] in a file and use the Spark REPL's `load` or `paste` command to load them all at once? On Wed, Apr 29, 2015 at 2:55 PM, Tom Hubregtsen thubregt...@gmail.com wrote: Thanks for the responses. Try removing toDebugString and see what happens. The toDebugString is performed after [d] (the action), as [e]. By then all stages are already executed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scalability of group by
Hi, I can offer a few ideas to investigate in regards to your issue here. I've run into resource issues doing shuffle operations with a much smaller dataset than 2B. The data is going to be saved to disk by the BlockManager as part of the shuffle and then redistributed across the cluster as relevant to the group by. So the data is going to be replicated during the operation. I might suggest trying to allocate more memory for your executors in your cluster. You might also want to look into configuring more explicitly the shuffle functionality ( https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior). Check the disk usage on the worker nodes, in our case we actually had small disk space to start with and were running out of temporary space for the shuffle operation. I believe you should also be able to find more clear errors in logs from the worker nodes if you haven't checked yet. On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: It works on a smaller dataset of 100 rows. Probably I could find the size when it fails using binary search. However, it would not help me because I need to work with 2B rows. *From:* ayan guha [mailto:guha.a...@gmail.com] *Sent:* Monday, April 27, 2015 6:58 PM *To:* Ulanov, Alexander *Cc:* user@spark.apache.org *Subject:* Re: Scalability of group by Hi Can you test on a smaller dataset to identify if it is cluster issue or scaling issue in spark On 28 Apr 2015 11:30, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, I am running a group by on a dataset of 2B of RDD[Row [id, time, value]] in Spark 1.3 as follows: “select id, time, first(value) from data group by id, time” My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor is allocated with 5GB of memory. However, all executors are being lost during the query execution and I get “ExecutorLostFailure”. Could you suggest what might be the reason for it? Could it be that “group by” is implemented as RDD.groupBy so it holds the group by result in memory? What is the workaround? Best regards, Alexander
Re: Group by order by
It's not related to Spark, but the concept of what you are trying to do with the data. Grouping by ID means consolidating data for each ID down to 1 row per ID. You can sort by time after this point yes, but you would need to either take each ID and time value pair OR do some aggregate operation on the time. That's what the error message is explaining. Maybe you can describe what you want your results to look like? Here is some detail about the underlying operations here: Example Data: ID | Time | SomeVal 102-02-154 1 02-03-15 5 2 02-02-15 4 2 02-02-15 5 2 02-05-15 2 A. So if you do Group By ID this means 1 row per ID like below: ID 1 2 To include Time in this projection you need to aggregate it with a function to a single value. Then and only then can you use it in the projection and sort on it. SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc; ID | max(time) 2 02-05-15 1 02-03-15 B. Or if you do Group by ID, time then you get 1 row per ID and time pair: ID | Time 102-02-15 102-03-15 202-02-15 202-05-15 Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the results here. In this case you can sort the results by time without using an aggregate function. SELECT id, time FROM sample GROUP BY id, time SORT BY time desc; ID | Time 202-05-15 102-03-15 102-02-15 202-02-15 On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi Richard, There are several values of time per id. Is there a way to perform group by id and sort by time in Spark? Best regards, Alexander *From:* Richard Marscher [mailto:rmarsc...@localytics.com] *Sent:* Monday, April 27, 2015 12:20 PM *To:* Ulanov, Alexander *Cc:* user@spark.apache.org *Subject:* Re: Group by order by Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
Re: Group by order by
Hi, that error seems to indicate the basic query is not properly expressed. If you group by just ID, then that means it would need to aggregate all the time values into one value per ID, so you can't sort by it. Thus it tries to suggest an aggregate function for time so you can have 1 value per ID and properly sort it. On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, Could you suggest what is the best way to do “group by x order by y” in Spark? When I try to perform it with Spark SQL I get the following error (Spark 1.3): val results = sqlContext.sql(select * from sample group by id order by time) org.apache.spark.sql.AnalysisException: expression 'time' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37) Is there a way to do it with just RDD? Best regards, Alexander
Re: Instantiating/starting Spark jobs programmatically
Could you possibly describe what you are trying to learn how to do in more detail? Some basics of submitting programmatically: - Create a SparkContext instance and use that to build your RDDs - You can only have 1 SparkContext per JVM you are running, so if you need to satisfy concurrent job requests you would need to manage a SparkContext as a shared resource on that server. Keep in mind if something goes wrong with that SparkContext, all running jobs would probably be in a failed state and you'd need to try to get a new SparkContext. - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. - You'll need to provide all of the relevant classes that your platform uses in the jobs on the classpath of the spark cluster. We do this with a JAR file loaded from S3 dynamically by a SparkContext, but there are other options. On Mon, Apr 20, 2015 at 10:12 PM, firemonk9 dhiraj.peech...@gmail.com wrote: I have built a data analytics SaaS platform by creating Rest end points and based on the type of job request I would invoke the necessary spark job/jobs and return the results as json(async). I used yarn-client mode to submit the jobs to yarn cluster. hope this helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with Spark SQL UserDefinedType and sbt assembly
If it fails with sbt-assembly but not without it, then there's always the likelihood of a classpath issue. What dependencies are you rolling up into your assembly jar? On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: sbt-assembly spark-streaming-kinesis-asl error
Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging