Re: uncontinuous offset in kafka will cause the spark streaming failure
We appear to be kindred spirits, I’ve recently run into the same issue. Are you running compacted topics? I’ve run into this issue on non-compacted topics as well, it happens rarely but is still a pain. You might check out this patch and related spark streaming Kafka ticket: https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 <https://github.com/apache/spark/compare/master...koeninger:SPARK-17147> https://issues.apache.org/jira/browse/SPARK-17147 <https://issues.apache.org/jira/browse/SPARK-17147> I’ll be testing out the patch on somewhat large scale stream processor tomorrow. CCing: Cody Koeninger Best, Justin > On Jan 23, 2018, at 10:48 PM, namesuperwood <namesuperw...@gmail.com> wrote: > > Hi all > > kafka version : kafka_2.11-0.11.0.2 >spark version : 2.0.1 > > A topic-partition "adn-tracking,15" in kafka who's earliest offset > is 1255644602 and latest offset is 1271253441. > While starting a spark streaming to process the data from the topic , > we got a exception with "Got wrong record even after seeking to offset > 1266921577”. [ (earliest offset) 1255644602 < 1266921577 < > 1271253441 ( latest offset ) ] > Finally, I found the following source code in class > CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact > that the offset from consumer poll and the offset which the comsuner seek is > not equal. > > Here is the “ CachedKafkaCounsumer.scala” code: > > def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { > > logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset > requested $offset") if (offset != nextOffset) { > > logInfo(s"Initial fetch for $groupId $topic $partition $offset") > seek(offset) poll(timeout) } > > if (!buffer.hasNext()) { poll(timeout) } > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > var record = buffer.next() > > if (record.offset != offset) { > logInfo(s"Buffer miss for $groupId $topic $partition $offset") > seek(offset) > poll(timeout) > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > record = buffer.next() > assert(record.offset == offset, > s"Got wrong record for $groupId $topic $partition even after seeking to > offset $offset") > } > > nextOffset = offset + 1 > record > } > I reproduce this problem, and found out that offset from one > topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs > to be repaired. > I implemented a simple project to use consumer to seek offset > 1266921577. But it return the offset 1266921578. Then while seek to > 1266921576, it return the 1266921576 exactly。 > > > > > There is the code: > public class consumerDemo { > public static void main(String[] argv){ > Properties props = new Properties(); > props.put("bootstrap.servers", "172.31.29.31:9091"); > props.put("group.id", "consumer-tutorial-demo"); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props); > TopicPartition tp = new TopicPartition("adn-tracking-click", 15); > Collection collection = new > ArrayList(); > collection.add(tp); consumer.assign(collection); > consumer.seek(tp, 1266921576); ConsumerRecords<String, String> > consumerRecords = consumer.poll(1); > List<ConsumerRecord<String, String>> listR = > consumerRecords.records(tp); > Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); > ConsumerRecord<String, String> record = iter.next(); > System.out.println(" the next record " + record.offset() + " recode > topic " + record.topic()); >} > } > > > > > > > > > > wood.super
Re: "Got wrong record after seeking to offset" issue
Yeah I saw that after I sent that e-mail out. Iactually remembered another ticket that I had commented on that turned out to be unrelated to the issue I was seeing at the time. It may be related to the current issue: https://issues.apache.org/jira/browse/SPARK-17147 <https://issues.apache.org/jira/browse/SPARK-17147> We are compacting topics, but only offset topics. We just updated our message version to 0.10 today as our last non-Spark project was brought up to 0.11 (Storm based). Justin > On Jan 18, 2018, at 1:39 PM, Cody Koeninger <c...@koeninger.org> wrote: > > https://kafka.apache.org/documentation/#compaction > > On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller > <justin.mil...@protectwise.com> wrote: >> By compacted do you mean compression? If so then we did recently turn on lz4 >> compression. If there’s another meaning if there’s a command I can run to >> check compaction I’m happy to give that a shot too. >> >> I’ll try consuming from the failed offset if/when the problem manifests >> itself again. >> >> Thanks! >> Justin >> >> >> On Wednesday, January 17, 2018, Cody Koeninger <c...@koeninger.org> wrote: >>> >>> That means the consumer on the executor tried to seek to the specified >>> offset, but the message that was returned did not have a matching >>> offset. If the executor can't get the messages the driver told it to >>> get, something's generally wrong. >>> >>> What happens when you try to consume the particular failing offset >>> from another (e.g. commandline) consumer? >>> >>> Is the topic in question compacted? >>> >>> >>> >>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller >>> <justin.mil...@protectwise.com> wrote: >>>> Greetings all, >>>> >>>> I’ve recently started hitting on the following error in Spark Streaming >>>> in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms >>>> even to five minutes doesn’t seem to be helping. The problem only >>>> manifested >>>> in the last few days, restarting with a new consumer group seems to remedy >>>> the issue for a few hours (< retention, which is 12 hours). >>>> >>>> Error: >>>> Caused by: java.lang.AssertionError: assertion failed: Got wrong record >>>> for spark-executor- 76 even after seeking to >>>> offset 1759148155 >>>>at scala.Predef$.assert(Predef.scala:170) >>>>at >>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85) >>>>at >>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) >>>>at >>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) >>>>at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) >>>> >>>> I guess my questions are, why is that assertion a job killer vs a >>>> warning and is there anything I can tweak settings wise that may keep it at >>>> bay. >>>> >>>> I wouldn’t be surprised if this issue were exacerbated by the volume we >>>> do on Kafka topics (~150k/sec on the persister that’s crashing). >>>> >>>> Thank you! >>>> Justin >>>> >>>> >>>> - >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>
Re: "Got wrong record after seeking to offset" issue
By compacted do you mean compression? If so then we did recently turn on lz4 compression. If there’s another meaning if there’s a command I can run to check compaction I’m happy to give that a shot too. I’ll try consuming from the failed offset if/when the problem manifests itself again. Thanks! Justin On Wednesday, January 17, 2018, Cody Koeninger <c...@koeninger.org> wrote: > That means the consumer on the executor tried to seek to the specified > offset, but the message that was returned did not have a matching > offset. If the executor can't get the messages the driver told it to > get, something's generally wrong. > > What happens when you try to consume the particular failing offset > from another (e.g. commandline) consumer? > > Is the topic in question compacted? > > > > On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller > <justin.mil...@protectwise.com> wrote: > > Greetings all, > > > > I’ve recently started hitting on the following error in Spark Streaming > in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms > even to five minutes doesn’t seem to be helping. The problem only > manifested in the last few days, restarting with a new consumer group seems > to remedy the issue for a few hours (< retention, which is 12 hours). > > > > Error: > > Caused by: java.lang.AssertionError: assertion failed: Got wrong record > for spark-executor- 76 even after seeking > to offset 1759148155 > > at scala.Predef$.assert(Predef.scala:170) > > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:85) > > at org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:223) > > at org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:189) > > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > > > > I guess my questions are, why is that assertion a job killer vs a > warning and is there anything I can tweak settings wise that may keep it at > bay. > > > > I wouldn’t be surprised if this issue were exacerbated by the volume we > do on Kafka topics (~150k/sec on the persister that’s crashing). > > > > Thank you! > > Justin > > > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
"Got wrong record after seeking to offset" issue
Greetings all, I’ve recently started hitting on the following error in Spark Streaming in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms even to five minutes doesn’t seem to be helping. The problem only manifested in the last few days, restarting with a new consumer group seems to remedy the issue for a few hours (< retention, which is 12 hours). Error: Caused by: java.lang.AssertionError: assertion failed: Got wrong record for spark-executor- 76 even after seeking to offset 1759148155 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) I guess my questions are, why is that assertion a job killer vs a warning and is there anything I can tweak settings wise that may keep it at bay. I wouldn’t be surprised if this issue were exacerbated by the volume we do on Kafka topics (~150k/sec on the persister that’s crashing). Thank you! Justin - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Forcing either Hive or Spark SQL representation for metastore
Hello, I was wondering if there were a way to force one representation or another for the Hive metastore. Some of our data can’t be parsed with the Hive method so it switches over to the Spark SQL method, leaving some of our data stored in Spark SQL format and some in Hive format. It’d be nice if we could force it to use the Spark SQL format, as changing the underlying data would be difficult. Some have (Spark SQL): 17/05/18 21:50:29 WARN HiveExternalCatalog: Could not persist `default`.`tablenamehere` in a Hive compatible way. Persisting it into Hive metastore in Spark SQL specific format. Some have (Hive): 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: bigint 17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: struct<srcMac:binary,dstMac:binary,srcIp:binary,dstIp:binary,srcPort:int,dstPort:int,proto:string,layer3Proto:int,layer4Proto:int> Another question about persisting to S3, I’m getting the following for all: Caused by: MetaException(message:java.io.IOException: Got exception: java.io.IOException /username/sys_encrypted/staging/raw/updatedTimeYear=2017/updatedTimeMonth=5/updatedTimeDay=16/updatedTimeHour=23 doesn't exist) Thanks! Justin
Is there a way to tell if a receiver is a Reliable Receiver?
I can't seem to find anywhere that would let a user know if the receiver they are using is reliable or not. Even better would be a list of known reliable receivers. Are any of these things possible? Or do you just have to research your receiver beforehand? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-tell-if-a-receiver-is-a-Reliable-Receiver-tp28609.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Avro/Parquet GenericFixed decimal is not read into Spark correctly
All, Before creating a JIRA for this I wanted to get a sense as to whether it would be shot down or not: Take the following code: spark-shell --packages org.apache.avro:avro:1.8.1 import org.apache.avro.{Conversions, LogicalTypes, Schema} import java.math.BigDecimal val dc = new Conversions.DecimalConversion() val javaBD = BigDecimal.valueOf(643.85924958) val schema = Schema.parse("{\"type\":\"record\",\"name\":\"Header\",\"namespace\":\"org.apache.avro.file\",\"fields\":[" + "{\"name\":\"COLUMN\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"COLUMN\"," + "\"size\":19,\"precision\":17,\"scale\":8,\"logicalType\":\"decimal\"}]}]}" ) val schemaDec = schema.getField("COLUMN").schema() val fieldSchema = if(schemaDec.getType() == Schema.Type.UNION) schemaDec.getTypes.get(1) else schemaDec val converted = dc.toFixed(javaBD, fieldSchema, LogicalTypes.decimal(javaBD.precision, javaBD.scale)) sqlContext.createDataFrame(List(("value",converted))) and you'll get this error: java.lang.UnsupportedOperationException: Schema for type org.apache.avro.generic.GenericFixed is not supported However if you write out a parquet file using the AvroParquetWriter and the above GenericFixed value (converted), then read it in via the DataFrameReader the decimal value that is retrieved is not accurate (ie. 643... above is listed as -0.5...) Even if not supported, is there any way to at least have it throw an UnsupportedOperationException as it does when you try to do it directly (as compared to read in from a file) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Avro-Parquet-GenericFixed-decimal-is-not-read-into-Spark-correctly-tp28592.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming Kafka Job has strange behavior for certain tasks
executor 1) (85/96) 17/04/05 16:48:44 INFO TaskSetManager: Finished task 75.0 in stage 7.0 (TID 672) in 310995 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 29) (86/96) 17/04/05 16:48:44 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 715) in 311159 ms on ip-172-20-212-43.us-west-2.compute.internal (executor 84) (87/96) 17/04/05 16:48:44 INFO TaskSetManager: Finished task 4.0 in stage 7.0 (TID 677) in 311443 ms on ip-172-20-220-110.us-west-2.compute.internal (executor 3) (88/96) 17/04/05 16:48:45 INFO TaskSetManager: Finished task 73.0 in stage 7.0 (TID 690) in 311523 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 76) (89/96) 17/04/05 16:48:45 INFO TaskSetManager: Finished task 84.0 in stage 7.0 (TID 686) in 311554 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 60) (90/96) 17/04/05 16:48:45 INFO TaskSetManager: Finished task 44.0 in stage 7.0 (TID 692) in 312165 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 4) (91/96) 17/04/05 16:48:45 INFO TaskSetManager: Finished task 63.0 in stage 7.0 (TID 762) in 312299 ms on ip-172-20-211-125.us-west-2.compute.internal (executor 79) (92/96) 17/04/05 16:48:46 INFO TaskSetManager: Finished task 94.0 in stage 7.0 (TID 724) in 313148 ms on ip-172-20-219-239.us-west-2.compute.internal (executor 5) (93/96) 17/04/05 16:48:46 INFO TaskSetManager: Finished task 18.0 in stage 7.0 (TID 717) in 313332 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 15) (94/96) 17/04/05 16:48:48 INFO TaskSetManager: Finished task 56.0 in stage 7.0 (TID 731) in 314838 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 95) (95/96) 17/04/05 16:48:52 INFO TaskSetManager: Finished task 74.0 in stage 7.0 (TID 704) in 318573 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 53) (96/96) Thanks, Justin - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
SparkStreaming getActiveOrCreate
The docs on getActiveOrCreate makes it seem that you'll get an already started context: > Either return the "active" StreamingContext (that is, started but not > stopped), or create a new StreamingContext that is However as far as I can tell from the code it is strictly dependent on the the implementation of the create function, and per the tests it is even expected that the create function will return a non-started function. So this makes for a bit awkward code that must check the state before starting or not. Was this considered when this was added? I couldn't find anything explicit -Justin Pihony -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-getActiveOrCreate-tp28508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to gracefully handle Kafka OffsetOutOfRangeException
I've created a ticket here: https://issues.apache.org/jira/browse/SPARK-19888 <https://issues.apache.org/jira/browse/SPARK-19888> Thanks, Justin > On Mar 10, 2017, at 1:14 PM, Michael Armbrust <mich...@databricks.com> wrote: > > If you have a reproduction you should open a JIRA. It would be great if > there is a fix. I'm just saying I know a similar issue does not exist in > structured streaming. > > On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller <justin.mil...@protectwise.com > <mailto:justin.mil...@protectwise.com>> wrote: > Hi Michael, > > I'm experiencing a similar issue. Will this not be fixed in Spark Streaming? > > Best, > Justin > >> On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com >> <mailto:mich...@databricks.com>> wrote: >> >> One option here would be to try Structured Streaming. We've added an option >> "failOnDataLoss" that will cause Spark to just skip a head when this >> exception is encountered (its off by default though so you don't silently >> miss data). >> >> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman >> <ram.the.m...@gmail.com <mailto:ram.the.m...@gmail.com>> wrote: >> I am using Spark streaming and reading data from Kafka using >> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to >> smallest. >> >> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException >> and my spark job crashes. >> >> I want to understand if there is a graceful way to handle this failure and >> not kill the job. I want to keep ignoring these exceptions, as some other >> partitions are fine and I am okay with data loss. >> >> Is there any way to handle this and not have my spark job crash? I have no >> option of increasing the kafka retention period. >> >> I tried to have the DStream returned by createDirectStream() wrapped in a >> Try construct, but since the exception happens in the executor, the Try >> construct didn't take effect. Do you have any ideas of how to handle this? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html >> >> <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html> >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com/>. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> > >
Re: How to gracefully handle Kafka OffsetOutOfRangeException
Hi Michael, I'm experiencing a similar issue. Will this not be fixed in Spark Streaming? Best, Justin > On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com> wrote: > > One option here would be to try Structured Streaming. We've added an option > "failOnDataLoss" that will cause Spark to just skip a head when this > exception is encountered (its off by default though so you don't silently > miss data). > > On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman > <ram.the.m...@gmail.com <mailto:ram.the.m...@gmail.com>> wrote: > I am using Spark streaming and reading data from Kafka using > KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to > smallest. > > But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException > and my spark job crashes. > > I want to understand if there is a graceful way to handle this failure and > not kill the job. I want to keep ignoring these exceptions, as some other > partitions are fine and I am okay with data loss. > > Is there any way to handle this and not have my spark job crash? I have no > option of increasing the kafka retention period. > > I tried to have the DStream returned by createDirectStream() wrapped in a > Try construct, but since the exception happens in the executor, the Try > construct didn't take effect. Do you have any ideas of how to handle this? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html > > <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html> > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > >
Re: Jar not in shell classpath in Windows 10
I've verified this is that issue, so please disregard. On Wed, Mar 1, 2017 at 1:07 AM, Justin Pihony <justin.pih...@gmail.com> wrote: > As soon as I posted this I found https://issues.apache. > org/jira/browse/SPARK-18648 which seems to be the issue. I'm looking at > it deeper now. > > On Wed, Mar 1, 2017 at 1:05 AM, Justin Pihony <justin.pih...@gmail.com> > wrote: > >> Run spark-shell --packages >> datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an >> import of anything com.datastax. I have checked that the jar is listed >> among >> the classpaths and it is, albeit behind a spark URL. I'm wondering if >> added >> jars fail in windows due to this server abstraction? I can't find anything >> on this already, so maybe it's somehow an environment thing. Has anybody >> encountered this? >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows- >> 10-tp28442.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: Jar not in shell classpath in Windows 10
As soon as I posted this I found https://issues.apache.org/jira/browse/SPARK-18648 which seems to be the issue. I'm looking at it deeper now. On Wed, Mar 1, 2017 at 1:05 AM, Justin Pihony <justin.pih...@gmail.com> wrote: > Run spark-shell --packages > datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an > import of anything com.datastax. I have checked that the jar is listed > among > the classpaths and it is, albeit behind a spark URL. I'm wondering if added > jars fail in windows due to this server abstraction? I can't find anything > on this already, so maybe it's somehow an environment thing. Has anybody > encountered this? > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows- > 10-tp28442.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Jar not in shell classpath in Windows 10
Run spark-shell --packages datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an import of anything com.datastax. I have checked that the jar is listed among the classpaths and it is, albeit behind a spark URL. I'm wondering if added jars fail in windows due to this server abstraction? I can't find anything on this already, so maybe it's somehow an environment thing. Has anybody encountered this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows-10-tp28442.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is there a list of missing optimizations for typed functions?
I was curious if there was introspection of certain typed functions and ran the following two queries: ds.where($"col" > 1).explain ds.filter(_.col > 1).explain And found that the typed function does NOT result in a PushedFilter. I imagine this is due to a limited view of the function, so I have two questions really: 1.) Is there a list of the methods that lose some of the optimizations that you get from non-functional methods? Is it any method that accepts a generic function? 2.) Is there any work to attempt reflection and gain some of these optimizations back? I couldn't find anything in JIRA. Thanks, Justin Pihony -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-functions-tp28418.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is there any scheduled release date for Spark 2.1.0?
Interesting, because a bug that seemed to be fixed in 2.1.0-SNAPSHOT doesn't appear to be fixed in 2.1.0 stable (it centered around a null-pointer exception during code gen). It seems to be fixed in 2.1.1-SNAPSHOT, but I can try stable again. > On Dec 28, 2016, at 1:38 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > > A SNAPSHOT build is not a stable artifact, but rather floats to the top of > commits that are intended for the next release. So, 2.1.1-SNAPSHOT comes > after the 2.1.0 release and contains any code at the time that the artifact > was built that was committed to the branch-2.1 maintenance branch and is, > therefore, intended for the eventual 2.1.1 maintenance release. Once a > release is tagged and stable artifacts for it can be built, there is no > purpose for s SNAPSHOT of that release -- e.g. there is no longer any purpose > for a 2.1.0-SNAPSHOT release; if you want 2.1.0, then you should be using > stable artifacts now, not SNAPSHOTs. > > The existence of a SNAPSHOT doesn't imply anything about the release date of > the associated finished version. Rather, it only indicates a name that is > attached to all of the code that is currently intended for the associated > release number. > > On Wed, Dec 28, 2016 at 3:09 PM, Justin Miller <justin.mil...@protectwise.com > <mailto:justin.mil...@protectwise.com>> wrote: > It looks like the jars for 2.1.0-SNAPSHOT are gone? > > https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/ > > <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/> > > Also: > > 2.1.0-SNAPSHOT/ > <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/> > Fri Dec 23 16:31:42 UTC 2016 > 2.1.1-SNAPSHOT/ > <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.1-SNAPSHOT/> > Wed Dec 28 20:01:10 UTC 2016 > 2.2.0-SNAPSHOT/ > <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.2.0-SNAPSHOT/> > Wed Dec 28 19:12:38 UTC 2016 > > What's with 2.1.1-SNAPSHOT? Is that version about to be released as well? > > Thanks! > Justin > >> On Dec 28, 2016, at 12:53 PM, Mark Hamstra <m...@clearstorydata.com >> <mailto:m...@clearstorydata.com>> wrote: >> >> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 >> <https://github.com/apache/spark/tree/v2.1.0> >> >> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers <ko...@tresata.com >> <mailto:ko...@tresata.com>> wrote: >> seems like the artifacts are on maven central but the website is not yet >> updated. >> >> strangely the tag v2.1.0 is not yet available on github. i assume its equal >> to v2.1.0-rc5 >> >> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller >> <justin.mil...@protectwise.com <mailto:justin.mil...@protectwise.com>> wrote: >> I'm curious about this as well. Seems like the vote passed. >> >> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal <asmbans...@gmail.com >> > <mailto:asmbans...@gmail.com>> wrote: >> > >> > >> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> >> >> > >
Re: Is there any scheduled release date for Spark 2.1.0?
It looks like the jars for 2.1.0-SNAPSHOT are gone? https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/ <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/> Also: 2.1.0-SNAPSHOT/ <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/> Fri Dec 23 16:31:42 UTC 2016 2.1.1-SNAPSHOT/ <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.1-SNAPSHOT/> Wed Dec 28 20:01:10 UTC 2016 2.2.0-SNAPSHOT/ <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.2.0-SNAPSHOT/> Wed Dec 28 19:12:38 UTC 2016 What's with 2.1.1-SNAPSHOT? Is that version about to be released as well? Thanks! Justin > On Dec 28, 2016, at 12:53 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > > The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 > <https://github.com/apache/spark/tree/v2.1.0> > > On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers <ko...@tresata.com > <mailto:ko...@tresata.com>> wrote: > seems like the artifacts are on maven central but the website is not yet > updated. > > strangely the tag v2.1.0 is not yet available on github. i assume its equal > to v2.1.0-rc5 > > On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller > <justin.mil...@protectwise.com <mailto:justin.mil...@protectwise.com>> wrote: > I'm curious about this as well. Seems like the vote passed. > > > On Dec 23, 2016, at 2:00 AM, Aseem Bansal <asmbans...@gmail.com > > <mailto:asmbans...@gmail.com>> wrote: > > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > > >
Re: Is there any scheduled release date for Spark 2.1.0?
I'm curious about this as well. Seems like the vote passed. > On Dec 23, 2016, at 2:00 AM, Aseem Bansalwrote: > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
K-Means seems biased to one center
(Cross post with http://stackoverflow.com/questions/32936380/k-means-clustering-is-biased-to-one-center) I have a corpus of wiki pages (baseball, hockey, music, football) which I'm running through tfidf and then through kmeans. After a couple issues to start (you can see my previous questions), I'm finally getting a KMeansModel...but when I try to predict, I keep getting the same center. Is this because of the small dataset, or because I'm comparing a multi-word document against a smaller amount of words(1-20) query? Or is there something else I'm doing wrong? See the below code: //Preprocessing of data includes splitting into words //and removing words with only 1 or 2 characters val corpus: RDD[Seq[String]] val hashingTF = new HashingTF(10) val tf = hashingTF.transform(corpus) val idf = new IDF().fit(tf) val tfidf = idf.transform(tf).cache val kMeansModel = KMeans.train(tfidf, 3, 10) val queryTf = hashingTF.transform(List("music")) val queryTfidf = idf.transform(queryTf) kMeansModel.predict(queryTfidf) //Always the same, no matter the term supplied
save checkpoint during dataframe row iteration
Good morning, I have a typical iterator loop on a DataFrame loaded from a parquet data source: val conf = new SparkConf().setAppName("Simple Application").setMaster("local") val sc = new JavaSparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val parquetDataFrame = sqlContext.read.parquet(parquetFilename.getAbsolutePath) parquetDataFrame.foreachPartition { rowIterator => rowIterator.foreach { row => // ... do work } } My use case is quite simple: I would like to save a checkpoint during processing, and if the driver program fails, skip over the initial records in the parquet file, and continue from the checkpoint. This would be analogous to storing a loop iterator value in a standard C++/Java for loop. My question is: are there any guarantees about the ordering of rows in the "foreach" closure? Even if there are not guarantees in general (i.e., for DataFrame from any source), considering that the data frame is created from a parquet file, are there any guarantees? Is it possible to implement my use case? Thanks for your time.
Re: Is MLBase dead?
To take a stab at my own answer: MLBase is now fully integrated into MLLib. MLI/MLLib are the mllib algorithms and MLO is the ml pipelines? On Mon, Sep 28, 2015 at 10:19 PM, Justin Pihony <justin.pih...@gmail.com> wrote: > As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and > org.apache.spark.ml? I cannot find anything official, and the last updates > seem to be a year or two old. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Is MLBase dead?
As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and org.apache.spark.ml? I cannot find anything official, and the last updates seem to be a year or two old. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access Spark UI through AWS
I figured it all out after this: http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-td21540.html The short is that I needed to set SPARK_PUBLIC_DNS (not DNS_HOME) = ec2_publicdns then the YARN proxy gets in the way, so I needed to go to: http://ec2_publicdns:20888/proxy/applicationid/jobs (9046 is the older emr port) or, as Jonathan said, the spark history server works once a job is completed. On Tue, Aug 25, 2015 at 5:26 PM, Justin Pihony justin.pih...@gmail.com wrote: OK, I figured the horrid look alsothe href of all of the styles is prefixed with the proxy dataso, ultimately if I can fix the proxy issues with the links, then I can fix the look also On Tue, Aug 25, 2015 at 5:17 PM, Justin Pihony justin.pih...@gmail.com wrote: SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to access the spark ui directly. The application proxy was still getting in the way by the way it creates the URL, so I manually filled in the /stage?id=#attempt=# and that workedI'm still having trouble with the css as the UI looks horridbut I'll tackle that next :) On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com wrote: Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access Spark UI through AWS
Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access Spark UI through AWS
OK, I figured the horrid look alsothe href of all of the styles is prefixed with the proxy dataso, ultimately if I can fix the proxy issues with the links, then I can fix the look also On Tue, Aug 25, 2015 at 5:17 PM, Justin Pihony justin.pih...@gmail.com wrote: SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to access the spark ui directly. The application proxy was still getting in the way by the way it creates the URL, so I manually filled in the /stage?id=#attempt=# and that workedI'm still having trouble with the css as the UI looks horridbut I'll tackle that next :) On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com wrote: Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to access Spark UI through AWS
SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to access the spark ui directly. The application proxy was still getting in the way by the way it creates the URL, so I manually filled in the /stage?id=#attempt=# and that workedI'm still having trouble with the css as the UI looks horridbut I'll tackle that next :) On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com wrote: Thanks. I just tried and still am having trouble. It seems to still be using the private address even if I try going through the resource manager. On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com wrote: I'm not sure why the UI appears broken like that either and haven't investigated it myself yet, but if you instead go to the YARN ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x, I believe), then you should be able to click on the ApplicationMaster link (or the History link for completed applications) to get to the Spark UI from there. The ApplicationMaster link will use the YARN Proxy Service (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark application's UI, regardless of what port it's running on. For completed applications, the History link will send you directly to the Spark History Server UI on port 18080. Hope that helps! ~ Jonathan On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote: I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI -through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got wrong md5sum for boto
Additional info...If I use an online md5sum check then it matches...So, it's either windows or python (using 2.7.10) On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com wrote: When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got wrong md5sum for boto
I found this solution: https://stackoverflow.com/questions/3390484/python-hashlib-md5-differs-between-linux-windows Does anybody see a reason why I shouldn't put in a PR to make this change? FROM with open(tgz_file_path) as tar: TO with open(tgz_file_path, rb) as tar: On Mon, Aug 24, 2015 at 11:58 AM, Justin Pihony justin.pih...@gmail.com wrote: Additional info...If I use an online md5sum check then it matches...So, it's either windows or python (using 2.7.10) On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com wrote: When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Got wrong md5sum for boto
When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to access Spark UI through AWS
I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI-through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Windowed stream operations -- These are too lazy for some use cases
We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once. The basic idea is this: val mergedMain = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help.
Re: Windowed stream operations -- These are too lazy for some use cases
I tried something like that. When I tried just doing count() on the DStream, it didn't seem like it was actually forcing the computation. What (sort of) worked was doing a forEachRDD((rdd) = rdd.count()), or doing a print() on the DStream. The only problem was this seemed to add a lot of processing overhead -- I couldn't figure out exactly why but it seemed to have something to do with forEachRDD only being executed on the driver. On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote: We are aggregating real time logs of events, and want to do windows of 30 minutes. However, since the computation doesn't start until 30 minutes have passed, there is a ton of data built up that processing could've already started on. When it comes time to actually process the data, there is too much for our cluster to handle at once. The basic idea is this: val mergedMain = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches Could you add a dummy action at this point? val firstStep = mergedStream .flatMap(r = ) // denormalize data for this particular output stream .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this would sum over the batches .persist() // this will be reused in windowing operations firstStep.count() // just to trigger computation firstStep .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = sumAggregates(x,y), 180, 180) // sum over the windows .map(rec = ...) // convert data to other format .foreachRDD{ (rdd, time) = rdd.saveAsTextFile(...) // save to text files } I would want the batches to be reduced as soon as they arrive (the first reduceByKey), since there isn't any reason to wait. Instead all of the unprocessed data has to be processed at the same time (this data is being heavily denormalized in some cases, and so generates a bunch of additional data). Thanks for any help. -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Spark Python process
I have a spark job that's running on a 10 node cluster and the python process on all the nodes is pegged at 100%. I was wondering what parts of a spark script are run in the python process and which get passed to the Java processes? Is there any documentation on this? Thanks, Justin
NaiveBayes for MLPipeline is absent
Hello, Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't find the JIRA ticket related to it too (or maybe I missed). Is there a plan to implement it? If no one has the bandwidth, I can work on it. Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NaiveBayes-for-MLPipeline-is-absent-tp23402.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0
Done. https://issues.apache.org/jira/browse/SPARK-8420 Justin On Wed, Jun 17, 2015 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote: That sounds like a bug. Could you create a JIRA and ping Yin Huai (cc'ed). -Xiangrui On Wed, May 27, 2015 at 12:57 AM, Justin Yip yipjus...@prediction.io wrote: Hello, I am trying out 1.4.0 and notice there are some differences in behavior with Timestamp between 1.3.1 and 1.4.0. In 1.3.1, I can compare a Timestamp with string. scala val df = sqlContext.createDataFrame(Seq((1, Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01 00:00:00 ... scala df.filter($_2 = 2014-06-01).show ... _1 _2 2 2014-01-01 00:00:... However, in 1.4.0, the filter is always false: scala val df = sqlContext.createDataFrame(Seq((1, Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01 00:00:00 df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp] scala df.filter($_2 = 2014-06-01).show +--+--+ |_1|_2| +--+--+ +--+--+ Not sure if that is intended, but I cannot find any doc mentioning these inconsistencies. Thanks. Justin View this message in context: Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0 Sent from the Apache Spark User List mailing list archive at Nabble.com.
NullPointerException with functions.rand()
Hello, I am using 1.4.0 and found the following weird behavior. This case works fine: scala sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn(index, rand(30)).show() +--+---+---+ |_1| _2| index| +--+---+---+ | 1| 2| 0.6662967911724369| | 3|100|0.35734504984676396| +--+---+---+ However, when I use sqlContext.createDataFrame instead, I get a NPE: scala sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index, rand(30)).show() java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39) at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39) .. Does any one know why? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-with-functions-rand-tp23267.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Why the default Params.copy doesn't work for Model.copy?
Hello, I have a question with Spark 1.4 ml library. In the copy function, it is stated that the default implementation doesn't work of Params doesn't work for models. ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Model.scala#L49 ) As a result, some feature generation transformer like StringIndexerModel cannot be used in Pipeline. Maybe due to my limited knowledge in ML pipeline, can anyone give me some hints why Model.copy behaves differently as other Params? Thanks! Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-default-Params-copy-doesn-t-work-for-Model-copy-tp23169.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Setting S3 output file grantees for spark output files
Hi all, I'm running Spark on AWS EMR and I'm having some issues getting the correct permissions on the output files using rdd.saveAsTextFile('file_dir_name'). In hive, I would add a line in the beginning of the script with set fs.s3.canned.acl=BucketOwnerFullControl and that would set the correct grantees for the files. For Spark, I tried adding the permissions as a --conf option: hadoop jar /mnt/var/lib/hadoop/steps/s-3HIRLHJJXV3SJ/script-runner.jar \ /home/hadoop/spark/bin/spark-submit --deploy-mode cluster --master yarn-cluster \ --conf spark.driver.extraJavaOptions -Dfs.s3.canned.acl=BucketOwnerFullControl \ hdfs:///user/hadoop/spark.py But the permissions do not get set properly on the output files. What is the proper way to pass in the 'fs.s3.canned.acl=BucketOwnerFullControl' or any of the S3 canned permissions to the spark job? Thanks in advance
Python Image Library and Spark
Hi all, I'm playing around with manipulating images via Python and want to utilize Spark for scalability. That said, I'm just learing Spark and my Python is a bit rusty (been doing PHP coding for the last few years). I think I have most of the process figured out. However, the script fails on larger images and Spark is sending out the following warning for smaller images: Stage 0 contains a task of very large size (1151 KB). The maximum recommended task size is 100 KB. My code is as follows: import Image from pyspark import SparkContext if __name__ == __main__: imageFile = sample.jpg outFile = sample.gray.jpg sc = SparkContext(appName=Grayscale) im = Image.open(imageFile) # Create an RDD for the data from the image file img_data = sc.parallelize( list(im.getdata()) ) # Create an RDD for the grayscale value gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07) ) # Put our grayscale value into the RGR channels grayscale = gValue.map( lambda x: (x,x,x) ) # Save the output in a new image. im.putdata( grayscale.collect() ) im.save(outFile) Obviously, something is amiss. However, I can't figure out where I'm off track with this. Any help is appreciated! Thanks in advance!!!
Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0
Hello, I am trying out 1.4.0 and notice there are some differences in behavior with Timestamp between 1.3.1 and 1.4.0. In 1.3.1, I can compare a Timestamp with string. scala val df = sqlContext.createDataFrame(Seq((1, Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01 00:00:00 ... scala df.filter($_2 = 2014-06-01).show ... _1 _2 2 2014-01-01 00:00:... However, in 1.4.0, the filter is always false: scala val df = sqlContext.createDataFrame(Seq((1, Timestamp.valueOf(2015-01-01 00:00:00)), (2, Timestamp.valueOf(2014-01-01 00:00:00 df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp] scala df.filter($_2 = 2014-06-01).show +--+--+ |_1|_2| +--+--+ +--+--+ Not sure if that is intended, but I cannot find any doc mentioning these inconsistencies. Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-behavior-with-Dataframe-Timestamp-between-1-3-1-and-1-4-0-tp23045.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Accumulators in Spark Streaming on UI
You need to make sure to name the accumulator. On Tue, May 26, 2015 at 2:23 PM, Snehal Nagmote nagmote.sne...@gmail.com wrote: Hello all, I have accumulator in spark streaming application which counts number of events received from Kafka. From the documentation , It seems Spark UI has support to display it . But I am unable to see it on UI. I am using spark 1.3.1 Do I need to call any method (print) or am I missing something ? Thanks in advance, Snehal
Building scaladoc using build/sbt unidoc failure
Hello, I am trying to build scala doc from the 1.4 branch. But it failed due to [error] (sql/compile:compile) java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) I followed the instruction on github https://github.com/apache/spark/tree/branch-1.4/docs and used the following command: $ build/sbt unidoc Please see attachment for detailed error. Did I miss anything? Thanks. Justin unidoc_error.txt (30K) http://apache-spark-user-list.1001560.n3.nabble.com/attachment/23044/0/unidoc_error.txt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-scaladoc-using-build-sbt-unidoc-failure-tp23044.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Why is RDD to PairRDDFunctions only via implicits?
The (crude) proof of concept seems to work: class RDD[V](value: List[V]){ def doStuff = println(I'm doing stuff) } object RDD{ implicit def toPair[V](x:RDD[V]) = new PairRDD(List((1,2))) } class PairRDD[K,V](value: List[(K,V)]) extends RDD (value){ def doPairs = println(I'm using pairs) } class Context{ def parallelize[K,V](x: List[(K,V)]) = new PairRDD(x) def parallelize[V](x: List[V]) = new RDD(x) } On Fri, May 22, 2015 at 2:44 PM, Reynold Xin r...@databricks.com wrote: I'm not sure if it is possible to overload the map function twice, once for just KV pairs, and another for K and V separately. On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com wrote: This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the RDD API, but it could be even more discoverable if made available via the API directly. I assume this was originally an omission that now needs to be kept for backwards compatibility, but would any of the repo owners be open to making this more discoverable to the point of API docs and tab completion (while keeping both binary and source compatibility)? class PairRDD extends RDD{ pair methods } RDD{ def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V] } As long as the implicits remain, then compatibility remains, but now it is explicit in the docs on how to get a PairRDD and in tab completion. Thoughts? Justin Pihony
Why is RDD to PairRDDFunctions only via implicits?
This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the RDD API, but it could be even more discoverable if made available via the API directly. I assume this was originally an omission that now needs to be kept for backwards compatibility, but would any of the repo owners be open to making this more discoverable to the point of API docs and tab completion (while keeping both binary and source compatibility)? class PairRDD extends RDD{ pair methods } RDD{ def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V] } As long as the implicits remain, then compatibility remains, but now it is explicit in the docs on how to get a PairRDD and in tab completion. Thoughts? Justin Pihony
Re: User Defined Type (UDT)
Xiangrui, is there a timeline for when UDTs will become a public API? I'm currently using them to support java 8's ZonedDateTime. On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng men...@gmail.com wrote: (Note that UDT is not a public API yet.) On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote: Hi all! I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for a project I'm working on. I've created a case class Person(name: String) and now I'm trying to make Spark to be able serialize and deserialize the defined type. I made a couple of attempts but none of them did not work in 100% (there were issues either in serialization or deserialization). This is my class and the corresponding UDT. @SQLUserDefinedType(udt = classOf[PersonUDT]) case class Person(name: String) class PersonUDT extends UserDefinedType[Person] { override def sqlType: DataType = StructType(Seq(StructField(name, StringType))) override def serialize(obj: Any): Seq[Any] = { This should return a Row instance instead of Seq[Any], because the sqlType is a struct type. obj match { case c: Person = Seq(c.name) } } override def userClass: Class[Person] = classOf[Person] override def deserialize(datum: Any): Person = { datum match { case values: Seq[_] = assert(values.length == 1) Person(values.head.asInstanceOf[String]) case values: util.ArrayList[_] = Person(values.get(0).asInstanceOf[String]) } } // In some other attempt I was creating RDD of Seq with manually serialized data and // I had to override equals because two DFs with the same type weren't actually equal // StructField(person,...types.PersonUDT@a096ac3) // StructField(person,...types.PersonUDT@613fd937) def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT] override def equals(other: Any): Boolean = other match { case that: PersonUDT = true case _ = false } override def hashCode(): Int = 1 } This is how I create RDD of Person and then try to create a DataFrame val rdd = sparkContext.parallelize((1 to 100).map(i = Person(i.toString))) val sparkDataFrame = sqlContext.createDataFrame(rdd) The second line throws an exception: java.lang.ClassCastException: types.PersonUDT cannot be cast to org.apache.spark.sql.types.StructType at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316) I looked into the code in SQLContext.scala and it seems that the code requires UDT to be extending StructType but in fact it extends UserDefinedType which extends directly DataType. I'm not sure whether it is a bug or I just don't know how to use UDTs. Do you have any suggestions how to solve this? I based my UDT on ExamplePointUDT but it seems to be incorrect. Is there a working example for UDT? Thank you for the reply in advance! wjur -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark logo license
Thanks! On Wed, May 20, 2015 at 12:41 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Check out Apache's trademark guidelines here: http://www.apache.org/foundation/marks/ Matei On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com wrote: What is the license on using the spark logo. Is it free to be used for displaying commercially? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark logo license
What is the license on using the spark logo. Is it free to be used for displaying commercially? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Windows DOS bug in windows-utils.cmd
When running something like this: spark-shell --jars foo.jar,bar.jar This keeps failing to include the tail of the jars list. Digging into the launch scripts I found that the comma makes it so that the list was sent as separate parameters. So, to keep things together, I tried spark-shell --jars foo.jar, bar.jar But, this still failed as the quotes carried over into some of the string checks and resulted in invalid character errors. So, I am curious if anybody sees a problem with making a PR to fix the script from ... if x%2==x ( echo %1 requires an argument. 2 exit /b 1 ) set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2 ... TO ... if x%~2==x ( echo %1 requires an argument. 2 exit /b 1 ) set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %~2 ... The only difference is the use of the tilde to remove any surrounding quotes if there are some. I figured I would ask here first to vet any unforeseen bugs this might cause in other systems. As far as I know this should be harmless and only make it so that comma separated lists will work in DOS. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windows-DOS-bug-in-windows-utils-cmd-tp22946.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TwitterUtils on Windows
I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TwitterUtils on Windows
I think I found the answer - http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html Do I have no way of running this in Windows locally? On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com wrote: I'm not 100% sure that is causing a problem, though. The stream still starts, but is giving blank output. I checked the environment variables in the ui and it is running local[*], so there should be no bottleneck there. On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com wrote: I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TwitterUtils on Windows
I'm not 100% sure that is causing a problem, though. The stream still starts, but is giving blank output. I checked the environment variables in the ui and it is running local[*], so there should be no bottleneck there. On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com wrote: I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Trying to understand sc.textFile better
All, I am trying to understand the textFile method deeply, but I think my lack of deep Hadoop knowledge is holding me back here. Let me lay out my understanding and maybe you can correct anything that is incorrect When sc.textFile(path) is called, then defaultMinPartitions is used, which is really just math.min(taskScheduler.defaultParallelism, 2). Let's assume we are using the SparkDeploySchedulerBackend and this is conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2)) So, now let's say the default is 2, going back to the textFile, this is passed in to HadoopRDD. The true size is determined in getPartitions() using inputFormat.getSplits(jobConf, minPartitions). But, from what I can find, the partitions is merely a hint and is in fact mostly ignored, so you will probably get the total number of blocks. OK, this fits with expectations, however what if the default is not used and you provide a partition size that is larger than the block size. If my research is right and the getSplits call simply ignores this parameter, then wouldn't the provided min end up being ignored and you would still just get the block size? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-understand-sc-textFile-better-tp22924.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting the best parameter set back from CrossValidatorModel
Hello, I am using MLPipeline. I would like to extract the best parameter found by CrossValidator. But I cannot find much document about how to do it. Can anyone give me some pointers? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-best-parameter-set-back-from-CrossValidatorModel-tp22915.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Best practice to avoid ambiguous columns in DataFrame.join
Hello, I would like ask know if there are recommended ways of preventing ambiguous columns when joining dataframes. When we join dataframes, it usually happen we join the column with identical name. I could have rename the columns on the right data frame, as described in the following code. Is there a better way to achieve this? scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3, b), (4, b))) df: org.apache.spark.sql.DataFrame = [_1: int, _2: string] scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), (4, 40))) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int] scala df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Custom Aggregate Function for DataFrame
Hi Ayan, I have a DF constructed from the following case class Event: case class State { attr1: String, } case class Event { userId: String, time: Long, state: State } I would like to generate a DF which contains the latest state of each userId. I could have first compute the latest time of each user, and join it back to the original data frame. But that involves two shuffles. Hence would like to see if there are ways to improve the performance. Thanks. Justin On Fri, May 15, 2015 at 6:32 AM, ayan guha guha.a...@gmail.com wrote: can you kindly elaborate on this? it should be possible to write udafs in similar lines of sum/min etc. On Fri, May 15, 2015 at 5:49 AM, Justin Yip yipjus...@prediction.io wrote: Hello, May I know if these is way to implement aggregate function for grouped data in DataFrame? I dug into the doc but didn't find any apart from the UDF functions which applies on a Row. Maybe I have missed something. Thanks. Justin -- View this message in context: Custom Aggregate Function for DataFrame http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Best Regards, Ayan Guha
Re: Best practice to avoid ambiguous columns in DataFrame.join
Thanks Michael, This is very helpful. I have a follow up question related to NaFunctions. Usually after a left outer join, we get lots of null value and we need to handle them before further processing. I have the following piece of code, the _1 column is duplicated and crashes the .na.fill functions. From your answer, it appears that Spark 1.4 resolves this issue as only a single _1 column is outputted. You know if there is a good workaround for Spark 1.3? scala df3.show _1 a 1 a 2 b 3 b 4 b scala df4.show _1 b 1 10 2 null 3 3 4 0 scala df3.join(df4, df3(_1) === df4(_1)).na.fill(-999) org.apache.spark.sql.AnalysisException: Reference '_1' is ambiguous, could be: _1#33, _1#31.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:229) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:128) at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161) ... Thanks! Justin On Fri, May 15, 2015 at 3:55 PM, Michael Armbrust mich...@databricks.com wrote: There are several ways to solve this ambiguity: *1. use the DataFrames to get the attribute so its already resolved and not just a string we need to map to a DataFrame.* df.join(df2, df(_1) === df2(_1)) *2. Use aliases* df.as('a).join(df2.as('b), $a._1 === $b._1) *3. rename the columns as you suggested.* df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema *4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String): DataFrame* df.join(df1, _1) This has the added benefit of only outputting a single _1 column. On Fri, May 15, 2015 at 3:44 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like ask know if there are recommended ways of preventing ambiguous columns when joining dataframes. When we join dataframes, it usually happen we join the column with identical name. I could have rename the columns on the right data frame, as described in the following code. Is there a better way to achieve this? scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3, b), (4, b))) df: org.apache.spark.sql.DataFrame = [_1: int, _2: string] scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), (4, 40))) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int] scala df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema Thanks. Justin -- View this message in context: Best practice to avoid ambiguous columns in DataFrame.join http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Custom Aggregate Function for DataFrame
Hello, May I know if these is way to implement aggregate function for grouped data in DataFrame? I dug into the doc but didn't find any apart from the UDF functions which applies on a Row. Maybe I have missed something. Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Creating StructType with DataFrame.withColumn
Hello, I would like to add a StructType to DataFrame. What would be the best way to do it? Not sure if it is possible using withColumn. A possible way is to convert the dataframe into a RDD[Row], add the struct and then convert it back to dataframe. But that seems an overkill. I guess I may have missed something crucial. Can anyone give me some pointers? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-StructType-with-DataFrame-withColumn-tp22715.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: casting timestamp into long fail in Spark 1.3.1
After some trial and error, using DataType solves the problem: df.withColumn(millis, $eventTime.cast( org.apache.spark.sql.types.LongType) * 1000) Justin On Thu, Apr 30, 2015 at 3:41 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I was able to cast a timestamp into long using df.withColumn(millis, $eventTime.cast(long) * 1000) in spark 1.3.0. However, this statement returns a failure with spark 1.3.1. I got the following exception: Exception in thread main org.apache.spark.sql.types.DataTypeException: Unsupported dataType: long. If you have a struct and a field name of it has any special characters, please use backticks (`) to quote that field name, e.g. `x+y`. Please note that backtick itself is not supported in a field name. at org.apache.spark.sql.types.DataTypeParser$class.toDataType(DataTypeParser.scala:95) at org.apache.spark.sql.types.DataTypeParser$$anon$1.toDataType(DataTypeParser.scala:107) at org.apache.spark.sql.types.DataTypeParser$.apply(DataTypeParser.scala:111) at org.apache.spark.sql.Column.cast(Column.scala:636) Is there any change in the casting logic which may lead to this failure? Thanks. Justin -- View this message in context: casting timestamp into long fail in Spark 1.3.1 http://apache-spark-user-list.1001560.n3.nabble.com/casting-timestamp-into-long-fail-in-Spark-1-3-1-tp22727.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
casting timestamp into long fail in Spark 1.3.1
Hello, I was able to cast a timestamp into long using df.withColumn(millis, $eventTime.cast(long) * 1000) in spark 1.3.0. However, this statement returns a failure with spark 1.3.1. I got the following exception: Exception in thread main org.apache.spark.sql.types.DataTypeException: Unsupported dataType: long. If you have a struct and a field name of it has any special characters, please use backticks (`) to quote that field name, e.g. `x+y`. Please note that backtick itself is not supported in a field name. at org.apache.spark.sql.types.DataTypeParser$class.toDataType(DataTypeParser.scala:95) at org.apache.spark.sql.types.DataTypeParser$$anon$1.toDataType(DataTypeParser.scala:107) at org.apache.spark.sql.types.DataTypeParser$.apply(DataTypeParser.scala:111) at org.apache.spark.sql.Column.cast(Column.scala:636) Is there any change in the casting logic which may lead to this failure? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/casting-timestamp-into-long-fail-in-Spark-1-3-1-tp22727.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Powered By Spark
Hi, Would you mind adding our company to the Powered By Spark page? organization name: atp URL: https://atp.io a list of which Spark components you are using: SparkSQL, MLLib, Databricks Cloud and a short description of your use case: Predictive models and learning algorithms to improve the relevance of programmatic marketing Thanks! Justin Justin Barton CTO +1 (718) 404 9272 +44 203 290 9272 atp.io | jus...@atp.io | find us https://atp.io/find-us
Column renaming after DataFrame.groupBy
Hello, I would like rename a column after aggregation. In the following code, the column name is SUM(_1#179), is there a way to rename it to a more friendly name? scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10))) scala d.groupBy(_1).sum().printSchema root |-- _1: integer (nullable = false) |-- SUM(_1#179): long (nullable = true) |-- SUM(_2#180): long (nullable = true) Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Expected behavior for DataFrame.unionAll
That explains it. Thanks Reynold. Justin On Mon, Apr 13, 2015 at 11:26 PM, Reynold Xin r...@databricks.com wrote: I think what happened was applying the narrowest possible type. Type widening is required, and as a result, the narrowest type is string between a string and an int. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144 On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin
Catching executor exception from executor in driver
Hello, I would like to know if there is a way of catching exception throw from executor exception from the driver program. Here is an example: try { val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect } catch { case e: SparkException = { println(sERROR: $e) println(sCAUSE: ${e.getCause}) } } Output: ERROR: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal): java.lang.ArithmeticException: / by zero at $line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51) ... CAUSE: null The exception cause is a null value. Is there any way that I can catch the ArithmeticException? Thanks Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Fwd: Expected behavior for DataFrame.unionAll
Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Expected-behavior-for-DataFrame-unionAll-tp22487.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to use Joda Time with Spark SQL?
Cheng, this is great info. I have a follow up question. There are a few very common data types (i.e. Joda DateTime) that is not directly supported by SparkSQL. Do you know if there are any plans for accommodating some common data types in SparkSQL? They don't need to be a first class datatype, but if they are available as UDT and provided by the SparkSQL library, that will make DataFrame users' life easier. Justin On Sat, Apr 11, 2015 at 5:41 AM, Cheng Lian lian.cs@gmail.com wrote: One possible approach can be defining a UDT (user-defined type) for Joda time. A UDT maps an arbitrary type to and from Spark SQL data types. You may check the ExamplePointUDT [1] for more details. [1]: https://github.com/apache/spark/blob/694aef0d71d2683eaf63cbd1d8e95c 2da423b72e/sql/core/src/main/scala/org/apache/spark/sql/ test/ExamplePointUDT.scala On 4/8/15 6:09 AM, adamgerst wrote: I've been using Joda Time in all my spark jobs (by using the nscala-time package) and have not run into any issues until I started trying to use spark sql. When I try to convert a case class that has a com.github.nscala_time.time.Imports.DateTime object in it, an exception is thrown for with a MatchError My assumption is that this is because the basic types of spark sql are java.sql.Timestamp and java.sql.Date and therefor spark doesn't know what to do about the DateTime value. How can I get around this? I would prefer not to have to change my code to make the values be Timestamps but I'm concerned that might be the only way. Would something like implicit conversions work here? It seems that even if I specify the schema manually then I would still have the issue since you have to specify the column type which has to be of type org.apache.spark.sql.types.DataType -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-use-Joda-Time-with-Spark-SQL-tp22415.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The $ notation for DataFrame Column
Hello, The DataFrame documentation always uses $columnX to annotates a column. But I cannot find much information about it. Maybe I have missed something. Can anyone point me to the doc about the $, if there is any? Thanks. Justin
DataFrame column name restriction
Hello, Are there any restriction in the column name? I tried to use ., but sqlContext.sql cannot find the column. I would guess that . is tricky as this affects accessing StructType, but are there any more restriction on column name? scala case class A(a: Int) defined class A scala sqlContext.createDataFrame(Seq(A(10), A(20))).withColumn(b.b, $a + 1) res19: org.apache.spark.sql.DataFrame = [a: int, b.b: int] scala res19.registerTempTable(res19) scala res19.select(a) res24: org.apache.spark.sql.DataFrame = [a: int] scala res19.select(a, b.b) org.apache.spark.sql.AnalysisException: cannot resolve 'b.b' given input columns a, b.b; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) Thanks. Justin
Re: DataFrame groupBy MapType
Thanks Michael. Will submit a ticket. Justin On Mon, Apr 6, 2015 at 1:53 PM, Michael Armbrust mich...@databricks.com wrote: I'll add that I don't think there is a convenient way to do this in the Column API ATM, but would welcome a JIRA for adding it :) On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com wrote: In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Expected behavior for DataFrame.unionAll
Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin
DataFrame degraded performance after DataFrame.cache
Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
Re: DataFrame degraded performance after DataFrame.cache
The schema has a StructType. Justin On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, Does the schema of your data have any decimal, array, map, or struct type? Thanks, Yin On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
Re: DataFrame degraded performance after DataFrame.cache
Thanks for the explanation Yin. Justin On Tue, Apr 7, 2015 at 7:36 PM, Yin Huai yh...@databricks.com wrote: I think the slowness is caused by the way that we serialize/deserialize the value of a complex type. I have opened https://issues.apache.org/jira/browse/SPARK-6759 to track the improvement. On Tue, Apr 7, 2015 at 6:59 PM, Justin Yip yipjus...@prediction.io wrote: The schema has a StructType. Justin On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, Does the schema of your data have any decimal, array, map, or struct type? Thanks, Yin On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a parquet file of around 55M rows (~ 1G on disk). Performing simple grouping operation is pretty efficient (I get results within 10 seconds). However, after called DataFrame.cache, I observe a significant performance degrade, the same operation now takes 3+ minutes. My hunch is that DataFrame cannot leverage its columnar format after persisting in memory. But cannot find anywhere from the doc mentioning this. Did I miss anything? Thanks! Justin
DataFrame groupBy MapType
Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Re: MLlib: save models to HDFS?
Hello Zhou, You can look at the recommendation template http://templates.prediction.io/PredictionIO/template-scala-parallel-recommendation of PredictionIO. PredictionIO is built on the top of spark. And this template illustrates how you can save the ALS model to HDFS and the reload it later. Justin On Fri, Apr 3, 2015 at 9:16 AM, S. Zhou myx...@yahoo.com.invalid wrote: I am new to MLib so I have a basic question: is it possible to save MLlib models (particularly CF models) to HDFS and then reload it later? If yes, could u share some sample code (I could not find it in MLlib tutorial). Thanks!
Re: StackOverflow Problem with 1.3 mllib ALS
Thanks Xiangrui, I used 80 iterations to demonstrates the marginal diminishing return in prediction quality :) Justin On Apr 2, 2015 00:16, Xiangrui Meng men...@gmail.com wrote: I think before 1.3 you also get stackoverflow problem in ~35 iterations. In 1.3.x, please use setCheckpointInterval to solve this problem, which is available in the current master and 1.3.1 (to be released soon). Btw, do you find 80 iterations are needed for convergence? -Xiangrui On Wed, Apr 1, 2015 at 11:54 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have been using Mllib's ALS in 1.2 and it works quite well. I have just upgraded to 1.3 and I encountered stackoverflow problem. After some digging, I realized that when the iteration ~35, I will get overflow problem. However, I can get at least 80 iterations with ALS in 1.2. Is there any change to the ALS algorithm? And are there any ways to achieve more iterations? Thanks. Justin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StackOverflow Problem with 1.3 mllib ALS
Hello, I have been using Mllib's ALS in 1.2 and it works quite well. I have just upgraded to 1.3 and I encountered stackoverflow problem. After some digging, I realized that when the iteration ~35, I will get overflow problem. However, I can get at least 80 iterations with ALS in 1.2. Is there any change to the ALS algorithm? And are there any ways to achieve more iterations? Thanks. Justin
Catching InvalidClassException in sc.objectFile
Hello, I have persisted a RDD[T] to disk through saveAsObjectFile. Then I changed the implementation of T. When I read the file with sc.objectFile using the new binary, I got the exception of java.io.InvalidClassException, which is expected. I try to catch this error via SparkException in the driver program. However, both getCause() and getSuppressed() are empty. What is the recommended way of catching this exception? Thanks. Justin
Did DataFrames break basic SQLContext?
I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Did DataFrames break basic SQLContext?
It appears that the metastore_db problem is related to https://issues.apache.org/jira/browse/SPARK-4758. I had another shell open that was stuck. This is probably a bug, though? import sqlContext.implicits case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) rdd.toDF results in a frozen shell after this line: INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . which, locks the internally created metastore_db On Wed, Mar 18, 2015 at 11:20 AM, Justin Pihony justin.pih...@gmail.com wrote: I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Bug in Streaming files?
All, Looking into this StackOverflow question https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469 it appears that there is a bug when utilizing the newFilesOnly parameter in FileInputDStream. Before creating a ticket, I wanted to verify it here. The gist is that this code is wrong: val modTimeIgnoreThreshold = math.max( initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting currentTime - durationToRemember.milliseconds // trailing end of the remember window ) The problem is that if you set newFilesOnly to false, then the initialModTimeIgnoreThreshold is always 0. This makes it always dropped out of the max operation. So, the best you get is files that were put in the directory (duration) from the start. Is this a bug or expected behavior; it seems like a bug to me. If I am correct, this appears to be a bigger fix than just using min as it would break other functionality. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Streaming-files-tp22051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL JSON array support
Is there any plans of supporting JSON arrays more fully? Take for example: val myJson = sqlContext.jsonRDD(List({foo:[{bar:1},{baz:2}]})) myJson.registerTempTable(JsonTest) I would like a way to pull out parts of the array data based on a key sql(SELECT foo[bar] FROM JsonTest) //projects only the object with bar, the rest would be null I could even work around this if there was some way to access the key name from the SchemaRDD: myJson.filter(x=x(0).asInstanceOf[Seq[Row]].exists(y=y.key == bar)) .map(x=x(0).asInstanceOf[Seq[Row]].filter(y=y.key == bar)) //This does the same as above, except also filtering out those without a bar key This is the closest suggestion I could find thus far, https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView which still does not solve the problem of pulling out the keys. I tried with a UDF also, but could not currently make that work either. If there isn't anything in the works, then would it be appropriate to create a ticket for this? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-JSON-array-support-tp21939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Static Analysis
Thanks! On Wed, Mar 4, 2015 at 3:58 PM, Michael Armbrust mich...@databricks.com wrote: It is somewhat out of data, but here is what we have so far: https://github.com/marmbrus/sql-typed On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com wrote: I am pretty sure that I saw a presentation where SparkSQL could be executed with static analysis, however I cannot find the presentation now, nor can I find any documentation or research papers on the topic. So, I am curious if there is indeed any work going on for this topic. The two things I would be interested in would be to be able to gain compile time safety, as well as gain the ability to work on my data as a type instead of a row (ie, result.map(x=x.Age) instead of having to use Row.get) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Static Analysis
I am pretty sure that I saw a presentation where SparkSQL could be executed with static analysis, however I cannot find the presentation now, nor can I find any documentation or research papers on the topic. So, I am curious if there is indeed any work going on for this topic. The two things I would be interested in would be to be able to gain compile time safety, as well as gain the ability to work on my data as a type instead of a row (ie, result.map(x=x.Age) instead of having to use Row.get) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SQLContext.applySchema strictness
Per the documentation: It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. However, it appears that this is not being enforced. import org.apache.spark.sql._ val sqlContext = new SqlContext(sc) val struct = StructType(List(StructField(test, BooleanType, true))) val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff))) val schemaData = sqlContext.applySchema(myData, struct) //No error schemaData.collect()(0).getBoolean(0) //Only now will I receive an error Is this expected or a bug? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQLContext.applySchema strictness
OK, but what about on an action, like collect()? Shouldn't it be able to determine the correctness at that time? On Fri, Feb 13, 2015 at 4:49 PM, Yin Huai yh...@databricks.com wrote: Hi Justin, It is expected. We do not check if the provided schema matches rows since all rows need to be scanned to give a correct answer. Thanks, Yin On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com wrote: Per the documentation: It is important to make sure that the structure of every Row of the provided RDD matches the provided schema. Otherwise, there will be runtime exception. However, it appears that this is not being enforced. import org.apache.spark.sql._ val sqlContext = new SqlContext(sc) val struct = StructType(List(StructField(test, BooleanType, true))) val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff))) val schemaData = sqlContext.applySchema(myData, struct) //No error schemaData.collect()(0).getBoolean(0) //Only now will I receive an error Is this expected or a bug? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Aggregate order semantics when spilling
Hi, I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner. I have some timeseries data that is of the form: (key, date, other data) Partition 1 (A, 2, ...) (B, 4, ...) (A, 1, ...) (A, 3, ...) (B, 6, ...) which I then partition by key, then sort within the partition: Partition 1 (A, 1, ...) (A, 2, ...) (A, 3, ...) (A, 4, ...) Partition 2 (B, 4, ...) (B, 6, ...) If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order. For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners. I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)]. Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me? Best, Justin
Re: Aggregate order semantics when spilling
Hi Andrew, Thanks for your response! For our use case, we aren't actually grouping, but rather updating running aggregates. I just picked grouping because it made the example easier to write out. However, when we merge combiners, the combiners have to have data that are adjacent to each other in the original partition. I feel that requiring groupByKey/cogroup to insert values into the correct place is quite expensive, and may not be possible for combiners that are trying to collapse down the data while assuming order. Would it be really expensive or perilous to the API if we just had combineByKey merge combiners in the same order as the data slices they represent? I have a very simple prototype that adds this additional semantic (it's only a 16 line diff): https://github.com/justinuang/spark/commit/b92ee6a6dbf70207eca68296289cb62c3cea76b8 It looks like the additional comparison is trivial to runtime, and this doesn't break any backcompat. Thanks, Justin On Tue, Jan 20, 2015 at 8:03 PM, Andrew Or and...@databricks.com wrote: Hi Justin, I believe the intended semantics of groupByKey or cogroup is that the ordering *within a key *is not preserved if you spill. In fact, the test cases for the ExternalAppendOnlyMap only assert that the Set representation of the results is as expected (see this line https://github.com/apache/spark/blob/9d9294aebf7208a76f43d8fc5a0489a83d7215f4/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala#L265). This is because these Spark primitives literally just group the values by a key but does not provide any ordering guarantees. However, if ordering within a key is a requirement for your application, then you may need to write your own PairRDDFunction that calls combineByKey. You can model your method after groupByKey, but change the combiner function slightly to take ordering into account. This may add some overhead to your application since you need to insert every value in the appropriate place, but since you're spilling anyway the overhead will likely be shadowed by disk I/O. Let me know if that works. -Andrew 2015-01-20 9:18 GMT-08:00 Justin Uang justin.u...@gmail.com: Hi, I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner. I have some timeseries data that is of the form: (key, date, other data) Partition 1 (A, 2, ...) (B, 4, ...) (A, 1, ...) (A, 3, ...) (B, 6, ...) which I then partition by key, then sort within the partition: Partition 1 (A, 1, ...) (A, 2, ...) (A, 3, ...) (A, 4, ...) Partition 2 (B, 4, ...) (B, 6, ...) If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order. For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners. I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)]. Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me? Best, Justin
Accumulator value in Spark UI
Hello, From accumulator documentation, it says that if the accumulator is named, it will be displayed in the WebUI. However, I cannot find it anywhere. Do I need to specify anything in the spark ui config? Thanks. Justin
Re: Accumulator value in Spark UI
Found it. Thanks Patrick. Justin On Wed, Jan 14, 2015 at 10:38 PM, Patrick Wendell pwend...@gmail.com wrote: It should appear in the page for any stage in which accumulators are updated. On Wed, Jan 14, 2015 at 6:46 PM, Justin Yip yipjus...@prediction.io wrote: Hello, From accumulator documentation, it says that if the accumulator is named, it will be displayed in the WebUI. However, I cannot find it anywhere. Do I need to specify anything in the spark ui config? Thanks. Justin
Re: How to create an empty RDD with a given type?
Xuelin, There is a function called emtpyRDD under spark context http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext which serves this purpose. Justin On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com wrote: Hi, I'd like to create a transform function, that convert RDD[String] to RDD[Int] Occasionally, the input RDD could be an empty RDD. I just want to directly create an empty RDD[Int] if the input RDD is empty. And, I don't want to return None as the result. Is there an easy way to do that?
IOException: exception in uploadSinglePart
Spark 1.1.0, running on AWS EMR cluster using yarn-client as master. I'm getting the following error when I attempt to save a RDD to S3. I've narrowed it down to a single partition that is ~150Mb in size (versus the other partitions that are closer to 1 Mb). I am able to work around this by saving to a HDFS file first, then using the hadoop distcp command to get the data upon S3, but it seems that Spark should be able to handle that. I've even tried writing to HDFS, then creating something like the following to get that up on S3: sc.textFile(hdfsPath).saveAsTextFile(s3Path) Here's the exception: java.io.IOException: exception in uploadSinglePart com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:164) com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:220) org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:105) org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106) java.io.FilterOutputStream.close(FilterOutputStream.java:160) org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:109) org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:101) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:994) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:979) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any ideas of what the underlying issue could be here? It feels like a timeout issue, but that's just a guess. Thanks, Justin
MLLib sample data format
Hello, I am looking into a couple of MLLib data files in https://github.com/apache/spark/tree/master/data/mllib. But I cannot find any explanation for these files? Does anyone know if they are documented? Thanks. Justin
Re: MLLib sample data format
Hi Shuo, Yes. I was reading the guide as well as the sample code. For example, in http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm, nowhere in the github repository I can find the file: sc.textFile( mllib/data/ridge-data/lpsa.data). Thanks. Justin On Sun, Jun 22, 2014 at 3:24 PM, Justin Yip yipjus...@gmail.com wrote: Hi Shuo, Yes. I was reading the guide as well as the sample code. For example, in http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm, now where in the github repository I can find the file: sc.textFile( mllib/data/ridge-data/lpsa.data). Thanks. Justin On Sun, Jun 22, 2014 at 2:40 PM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, you might find http://spark.apache.org/docs/latest/mllib-guide.html helpful. On Sun, Jun 22, 2014 at 2:35 PM, Justin Yip yipjus...@gmail.com wrote: Hello, I am looking into a couple of MLLib data files in https://github.com/apache/spark/tree/master/data/mllib. But I cannot find any explanation for these files? Does anyone know if they are documented? Thanks. Justin
Re: MLLib sample data format
Hi Shuo, Yes. I was reading the guide as well as the sample code. For example, in http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm, now where in the github repository I can find the file: sc.textFile( mllib/data/ridge-data/lpsa.data). Thanks. Justin On Sun, Jun 22, 2014 at 2:40 PM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi, you might find http://spark.apache.org/docs/latest/mllib-guide.html helpful. On Sun, Jun 22, 2014 at 2:35 PM, Justin Yip yipjus...@gmail.com wrote: Hello, I am looking into a couple of MLLib data files in https://github.com/apache/spark/tree/master/data/mllib. But I cannot find any explanation for these files? Does anyone know if they are documented? Thanks. Justin
Re: MLLib sample data format
I see. That's good. Thanks. Justin On Sun, Jun 22, 2014 at 4:59 PM, Evan Sparks evan.spa...@gmail.com wrote: Oh, and the movie lens one is userid::movieid::rating - Evan On Jun 22, 2014, at 3:35 PM, Justin Yip yipjus...@gmail.com wrote: Hello, I am looking into a couple of MLLib data files in https://github.com/apache/spark/tree/master/data/mllib. But I cannot find any explanation for these files? Does anyone know if they are documented? Thanks. Justin
MLLib : Decision Tree with minimum points per node
Hello, I have been playing around with mllib's decision tree library. It is working great, thanks. I have a question regarding overfitting. It appears to me that the current implementation doesn't allows user to specify the minimum number of samples per node. This results in some nodes only contain very few samples, which potentially leads to overfitting. I would like to know if there is workaround or any way to prevent overfitting? Or will decision tree supports min-samples-per-node in future releases? Thanks. Justin
KyroException: Unable to find class
Hello, I have been using Externalizer from Chill to as serialization wrapper. It appears to me that Spark have some conflict with the classloader with Chill. I have the (a simplified version) following program: import java.io._ import com.twitter.chill.Externalizer class X(val i: Int) { override def toString() = sX(i=$i) } object SimpleApp { def main(args: Array[String]) { val bos = new ByteArrayOutputStream(10) val oos = new ObjectOutputStream(bos) oos.writeObject(Externalizer(new X(10))) oos.close() val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) val y = ois.readObject.asInstanceOf[Externalizer[X]] println(y.get) } } When I run it as a normal program (i.e. sbt run), the program runs fine. But when I run it with spark-submit (i.e. spark-submit --verbose --class SimpleApp --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar ), the program fails at ois.readObject call. I got an error that Kryo fails to find the class X. Exception in thread main com.esotericsoftware.kryo.KryoException: Unable to find class: X at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) I guess the issue is that Spark has a magic classloader, and Kryo fails to see the same classpaths. Is there anyway to remedy this issue? Thanks. Justin