Re: uncontinuous offset in kafka will cause the spark streaming failure

2018-01-23 Thread Justin Miller
We appear to be kindred spirits, I’ve recently run into the same issue. Are you 
running compacted topics? I’ve run into this issue on non-compacted topics as 
well, it happens rarely but is still a pain. You might check out this patch and 
related spark streaming Kafka ticket:

https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 
<https://github.com/apache/spark/compare/master...koeninger:SPARK-17147>
https://issues.apache.org/jira/browse/SPARK-17147 
<https://issues.apache.org/jira/browse/SPARK-17147>

I’ll be testing out the patch on somewhat large scale stream processor tomorrow.

CCing: Cody Koeninger

Best,
Justin

> On Jan 23, 2018, at 10:48 PM, namesuperwood <namesuperw...@gmail.com> wrote:
> 
> Hi all
> 
>   kafka version :  kafka_2.11-0.11.0.2
>spark version :  2.0.1 
> 
>   A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset 
> is  1255644602 and  latest offset is 1271253441.  
>   While starting a spark streaming to process the data from the topic ,  
> we got a exception with "Got wrong record   even after seeking to offset 
> 1266921577”.  [   (earliest offset) 1255644602 < 1266921577   < 
> 1271253441 ( latest offset ) ]
>   Finally, I found the following source code in class 
> CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact 
> that the offset from consumer poll and the offset which the comsuner seek is 
> not equal.
>   
>   Here is the “ CachedKafkaCounsumer.scala” code:
> 
>   def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { 
> 
>   logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset 
> requested $offset") if (offset !=  nextOffset) { 
> 
>   logInfo(s"Initial fetch for $groupId $topic $partition $offset") 
> seek(offset) poll(timeout) }
> 
> if (!buffer.hasNext()) { poll(timeout) }
> assert(buffer.hasNext(),
>   s"Failed to get records for $groupId $topic $partition $offset after 
> polling for $timeout")
> var record = buffer.next()
> 
> if (record.offset != offset) {
>   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
>   seek(offset)
>   poll(timeout)
>   assert(buffer.hasNext(),
> s"Failed to get records for $groupId $topic $partition $offset after 
> polling for $timeout")
>   record = buffer.next()
>   assert(record.offset == offset,
> s"Got wrong record for $groupId $topic $partition even after seeking to 
> offset $offset")
> }
> 
> nextOffset = offset + 1
> record 
> }
>   I reproduce this problem, and found out that offset from one 
> topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs 
> to be repaired.
>   I  implemented a simple project to use consumer to  seek offset 
> 1266921577. But it return the offset 1266921578. Then while  seek to 
> 1266921576, it return the 1266921576 exactly。 
>   
>   
> 
> 
> There is the code:
> public class consumerDemo {
> public static void main(String[] argv){   
>   Properties props = new Properties(); 
>   props.put("bootstrap.servers", "172.31.29.31:9091"); 
>   props.put("group.id", "consumer-tutorial-demo"); 
>   props.put("key.deserializer", StringDeserializer.class.getName()); 
>   props.put("value.deserializer", StringDeserializer.class.getName()); 
>   KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
> String>(props); 
>   TopicPartition tp = new TopicPartition("adn-tracking-click", 15); 
>   Collection collection = new 
> ArrayList(); 
>   collection.add(tp); consumer.assign(collection); 
>   consumer.seek(tp, 1266921576); ConsumerRecords<String, String> 
> consumerRecords = consumer.poll(1); 
>   List<ConsumerRecord<String, String>> listR = 
> consumerRecords.records(tp);
>   Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); 
>   ConsumerRecord<String, String> record = iter.next(); 
>   System.out.println(" the next record " + record.offset() + " recode 
> topic " + record.topic());
>}
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> wood.super



Re: "Got wrong record after seeking to offset" issue

2018-01-18 Thread Justin Miller
Yeah I saw that after I sent that e-mail out. Iactually remembered another 
ticket that I had commented on that turned out to be unrelated to the issue I 
was seeing at the time. It may be related to the current issue:

https://issues.apache.org/jira/browse/SPARK-17147 
<https://issues.apache.org/jira/browse/SPARK-17147>

We are compacting topics, but only offset topics. We just updated our message 
version to 0.10 today as our last non-Spark project was brought up to 0.11 
(Storm based).

Justin

> On Jan 18, 2018, at 1:39 PM, Cody Koeninger <c...@koeninger.org> wrote:
> 
> https://kafka.apache.org/documentation/#compaction
> 
> On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
> <justin.mil...@protectwise.com> wrote:
>> By compacted do you mean compression? If so then we did recently turn on lz4
>> compression. If there’s another meaning if there’s a command I can run to
>> check compaction I’m happy to give that a shot too.
>> 
>> I’ll try consuming from the failed offset if/when the problem manifests
>> itself again.
>> 
>> Thanks!
>> Justin
>> 
>> 
>> On Wednesday, January 17, 2018, Cody Koeninger <c...@koeninger.org> wrote:
>>> 
>>> That means the consumer on the executor tried to seek to the specified
>>> offset, but the message that was returned did not have a matching
>>> offset.  If the executor can't get the messages the driver told it to
>>> get, something's generally wrong.
>>> 
>>> What happens when you try to consume the particular failing offset
>>> from another  (e.g. commandline) consumer?
>>> 
>>> Is the topic in question compacted?
>>> 
>>> 
>>> 
>>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>>> <justin.mil...@protectwise.com> wrote:
>>>> Greetings all,
>>>> 
>>>> I’ve recently started hitting on the following error in Spark Streaming
>>>> in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
>>>> even to five minutes doesn’t seem to be helping. The problem only 
>>>> manifested
>>>> in the last few days, restarting with a new consumer group seems to remedy
>>>> the issue for a few hours (< retention, which is 12 hours).
>>>> 
>>>> Error:
>>>> Caused by: java.lang.AssertionError: assertion failed: Got wrong record
>>>> for spark-executor-  76 even after seeking to
>>>> offset 1759148155
>>>>at scala.Predef$.assert(Predef.scala:170)
>>>>at
>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>>>>at
>>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>>>>at
>>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>>>at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>>>> 
>>>> I guess my questions are, why is that assertion a job killer vs a
>>>> warning and is there anything I can tweak settings wise that may keep it at
>>>> bay.
>>>> 
>>>> I wouldn’t be surprised if this issue were exacerbated by the volume we
>>>> do on Kafka topics (~150k/sec on the persister that’s crashing).
>>>> 
>>>> Thank you!
>>>> Justin
>>>> 
>>>> 
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> 



Re: "Got wrong record after seeking to offset" issue

2018-01-17 Thread Justin Miller
By compacted do you mean compression? If so then we did recently turn on
lz4 compression. If there’s another meaning if there’s a command I can run
to check compaction I’m happy to give that a shot too.

I’ll try consuming from the failed offset if/when the problem manifests
itself again.

Thanks!
Justin

On Wednesday, January 17, 2018, Cody Koeninger <c...@koeninger.org> wrote:

> That means the consumer on the executor tried to seek to the specified
> offset, but the message that was returned did not have a matching
> offset.  If the executor can't get the messages the driver told it to
> get, something's generally wrong.
>
> What happens when you try to consume the particular failing offset
> from another  (e.g. commandline) consumer?
>
> Is the topic in question compacted?
>
>
>
> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
> <justin.mil...@protectwise.com> wrote:
> > Greetings all,
> >
> > I’ve recently started hitting on the following error in Spark Streaming
> in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
> even to five minutes doesn’t seem to be helping. The problem only
> manifested in the last few days, restarting with a new consumer group seems
> to remedy the issue for a few hours (< retention, which is 12 hours).
> >
> > Error:
> > Caused by: java.lang.AssertionError: assertion failed: Got wrong record
> for spark-executor-  76 even after seeking
> to offset 1759148155
> > at scala.Predef$.assert(Predef.scala:170)
> > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:85)
> > at org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:223)
> > at org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:189)
> > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> >
> > I guess my questions are, why is that assertion a job killer vs a
> warning and is there anything I can tweak settings wise that may keep it at
> bay.
> >
> > I wouldn’t be surprised if this issue were exacerbated by the volume we
> do on Kafka topics (~150k/sec on the persister that’s crashing).
> >
> > Thank you!
> > Justin
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


"Got wrong record after seeking to offset" issue

2018-01-16 Thread Justin Miller
Greetings all,

I’ve recently started hitting on the following error in Spark Streaming in 
Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms even to 
five minutes doesn’t seem to be helping. The problem only manifested in the 
last few days, restarting with a new consumer group seems to remedy the issue 
for a few hours (< retention, which is 12 hours).

Error:
Caused by: java.lang.AssertionError: assertion failed: Got wrong record for 
spark-executor-  76 even after seeking to offset 
1759148155
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

I guess my questions are, why is that assertion a job killer vs a warning and 
is there anything I can tweak settings wise that may keep it at bay.

I wouldn’t be surprised if this issue were exacerbated by the volume we do on 
Kafka topics (~150k/sec on the persister that’s crashing).

Thank you!
Justin


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Forcing either Hive or Spark SQL representation for metastore

2017-05-18 Thread Justin Miller
Hello,

I was wondering if there were a way to force one representation or another for 
the Hive metastore. Some of our data can’t be parsed with the Hive method so it 
switches over to the Spark SQL method, leaving some of our data stored in Spark 
SQL format and some in Hive format. It’d be nice if we could force it to use 
the Spark SQL format, as changing the underlying data would be difficult.

Some have (Spark SQL): 

17/05/18 21:50:29 WARN HiveExternalCatalog: Could not persist 
`default`.`tablenamehere` in a Hive compatible way. Persisting it into Hive 
metastore in Spark SQL specific format.

Some have (Hive):

17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int
17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int
17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int
17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: int
17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: bigint
17/05/18 21:52:27 INFO CatalystSqlParser: Parsing command: 
struct<srcMac:binary,dstMac:binary,srcIp:binary,dstIp:binary,srcPort:int,dstPort:int,proto:string,layer3Proto:int,layer4Proto:int>

Another question about persisting to S3, I’m getting the following for all: 

Caused by: MetaException(message:java.io.IOException: Got exception: 
java.io.IOException 
/username/sys_encrypted/staging/raw/updatedTimeYear=2017/updatedTimeMonth=5/updatedTimeDay=16/updatedTimeHour=23
 doesn't exist)

Thanks!
Justin

Is there a way to tell if a receiver is a Reliable Receiver?

2017-04-17 Thread Justin Pihony
I can't seem to find anywhere that would let a user know if the receiver they
are using is reliable or not. Even better would be a list of known reliable
receivers. Are any of these things possible? Or do you just have to research
your receiver beforehand?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-tell-if-a-receiver-is-a-Reliable-Receiver-tp28609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Avro/Parquet GenericFixed decimal is not read into Spark correctly

2017-04-12 Thread Justin Pihony
All,

Before creating a JIRA for this I wanted to get a sense as to whether it
would be shot down or not:

Take the following code:

spark-shell --packages org.apache.avro:avro:1.8.1
import org.apache.avro.{Conversions, LogicalTypes, Schema}
import java.math.BigDecimal
val dc = new Conversions.DecimalConversion()
val javaBD = BigDecimal.valueOf(643.85924958)
val schema =
   
Schema.parse("{\"type\":\"record\",\"name\":\"Header\",\"namespace\":\"org.apache.avro.file\",\"fields\":["
+
 
"{\"name\":\"COLUMN\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"COLUMN\","
+
 
"\"size\":19,\"precision\":17,\"scale\":8,\"logicalType\":\"decimal\"}]}]}"
)
val schemaDec = schema.getField("COLUMN").schema()
val fieldSchema = if(schemaDec.getType() == Schema.Type.UNION)
schemaDec.getTypes.get(1) else schemaDec
val converted = dc.toFixed(javaBD, fieldSchema,
LogicalTypes.decimal(javaBD.precision, javaBD.scale))
sqlContext.createDataFrame(List(("value",converted)))

and you'll get this error:

java.lang.UnsupportedOperationException: Schema for type
org.apache.avro.generic.GenericFixed is not supported

However if you write out a parquet file using the AvroParquetWriter and the
above GenericFixed value (converted), then read it in via the
DataFrameReader the decimal value that is retrieved is not accurate (ie.
643... above is listed as -0.5...)

Even if not supported, is there any way to at least have it throw an
UnsupportedOperationException as it does when you try to do it directly (as
compared to read in from a file)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Avro-Parquet-GenericFixed-decimal-is-not-read-into-Spark-correctly-tp28592.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming Kafka Job has strange behavior for certain tasks

2017-04-05 Thread Justin Miller
executor 1) 
(85/96)
17/04/05 16:48:44 INFO TaskSetManager: Finished task 75.0 in stage 7.0 (TID 
672) in 310995 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 29) 
(86/96)
17/04/05 16:48:44 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 715) 
in 311159 ms on ip-172-20-212-43.us-west-2.compute.internal (executor 84) 
(87/96)
17/04/05 16:48:44 INFO TaskSetManager: Finished task 4.0 in stage 7.0 (TID 677) 
in 311443 ms on ip-172-20-220-110.us-west-2.compute.internal (executor 3) 
(88/96)
17/04/05 16:48:45 INFO TaskSetManager: Finished task 73.0 in stage 7.0 (TID 
690) in 311523 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 76) 
(89/96)
17/04/05 16:48:45 INFO TaskSetManager: Finished task 84.0 in stage 7.0 (TID 
686) in 311554 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 60) 
(90/96)
17/04/05 16:48:45 INFO TaskSetManager: Finished task 44.0 in stage 7.0 (TID 
692) in 312165 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 4) 
(91/96)
17/04/05 16:48:45 INFO TaskSetManager: Finished task 63.0 in stage 7.0 (TID 
762) in 312299 ms on ip-172-20-211-125.us-west-2.compute.internal (executor 79) 
(92/96)
17/04/05 16:48:46 INFO TaskSetManager: Finished task 94.0 in stage 7.0 (TID 
724) in 313148 ms on ip-172-20-219-239.us-west-2.compute.internal (executor 5) 
(93/96)
17/04/05 16:48:46 INFO TaskSetManager: Finished task 18.0 in stage 7.0 (TID 
717) in 313332 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 15) 
(94/96)
17/04/05 16:48:48 INFO TaskSetManager: Finished task 56.0 in stage 7.0 (TID 
731) in 314838 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 95) 
(95/96)
17/04/05 16:48:52 INFO TaskSetManager: Finished task 74.0 in stage 7.0 (TID 
704) in 318573 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 53) 
(96/96)

Thanks,
Justin


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SparkStreaming getActiveOrCreate

2017-03-18 Thread Justin Pihony
The docs on getActiveOrCreate makes it seem that you'll get an already
started context:

> Either return the "active" StreamingContext (that is, started but not
> stopped), or create a new StreamingContext that is

However as far as I can tell from the code it is strictly dependent on the
the implementation of the create function, and per the tests it is even
expected that the create function will return a non-started function. So
this makes for a bit awkward code that must check the state before starting
or not. Was this considered when this was added? I couldn't find anything
explicit

-Justin Pihony



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-getActiveOrCreate-tp28508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Justin Miller
I've created a ticket here: https://issues.apache.org/jira/browse/SPARK-19888 
<https://issues.apache.org/jira/browse/SPARK-19888>

Thanks,
Justin

> On Mar 10, 2017, at 1:14 PM, Michael Armbrust <mich...@databricks.com> wrote:
> 
> If you have a reproduction you should open a JIRA.  It would be great if 
> there is a fix.  I'm just saying I know a similar issue does not exist in 
> structured streaming.
> 
> On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller <justin.mil...@protectwise.com 
> <mailto:justin.mil...@protectwise.com>> wrote:
> Hi Michael,
> 
> I'm experiencing a similar issue. Will this not be fixed in Spark Streaming?
> 
> Best,
> Justin
> 
>> On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com 
>> <mailto:mich...@databricks.com>> wrote:
>> 
>> One option here would be to try Structured Streaming.  We've added an option 
>> "failOnDataLoss" that will cause Spark to just skip a head when this 
>> exception is encountered (its off by default though so you don't silently 
>> miss data).
>> 
>> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman 
>> <ram.the.m...@gmail.com <mailto:ram.the.m...@gmail.com>> wrote:
>> I am using Spark streaming and reading data from Kafka using
>> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> smallest.
>> 
>> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
>> and my spark job crashes.
>> 
>> I want to understand if there is a graceful way to handle this failure and
>> not kill the job. I want to keep ignoring these exceptions, as some other
>> partitions are fine and I am okay with data loss.
>> 
>> Is there any way to handle this and not have my spark job crash? I have no
>> option of increasing the kafka retention period.
>> 
>> I tried to have the DStream returned by createDirectStream() wrapped in a
>> Try construct, but since the exception happens in the executor, the Try
>> construct didn't take effect. Do you have any ideas of how to handle this?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
>>  
>> <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> <http://nabble.com/>.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
> 
> 



Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Justin Miller
Hi Michael,

I'm experiencing a similar issue. Will this not be fixed in Spark Streaming?

Best,
Justin

> On Mar 10, 2017, at 8:34 AM, Michael Armbrust <mich...@databricks.com> wrote:
> 
> One option here would be to try Structured Streaming.  We've added an option 
> "failOnDataLoss" that will cause Spark to just skip a head when this 
> exception is encountered (its off by default though so you don't silently 
> miss data).
> 
> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman 
> <ram.the.m...@gmail.com <mailto:ram.the.m...@gmail.com>> wrote:
> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
> 
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
> 
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
> 
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
> 
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Jar not in shell classpath in Windows 10

2017-02-28 Thread Justin Pihony
I've verified this is that issue, so please disregard.

On Wed, Mar 1, 2017 at 1:07 AM, Justin Pihony <justin.pih...@gmail.com>
wrote:

> As soon as I posted this I found https://issues.apache.
> org/jira/browse/SPARK-18648 which seems to be the issue. I'm looking at
> it deeper now.
>
> On Wed, Mar 1, 2017 at 1:05 AM, Justin Pihony <justin.pih...@gmail.com>
> wrote:
>
>> Run spark-shell --packages
>> datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an
>> import of anything com.datastax. I have checked that the jar is listed
>> among
>> the classpaths and it is, albeit behind a spark URL. I'm wondering if
>> added
>> jars fail in windows due to this server abstraction? I can't find anything
>> on this already, so maybe it's somehow an environment thing. Has anybody
>> encountered this?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows-
>> 10-tp28442.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Jar not in shell classpath in Windows 10

2017-02-28 Thread Justin Pihony
As soon as I posted this I found
https://issues.apache.org/jira/browse/SPARK-18648 which seems to be the
issue. I'm looking at it deeper now.

On Wed, Mar 1, 2017 at 1:05 AM, Justin Pihony <justin.pih...@gmail.com>
wrote:

> Run spark-shell --packages
> datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an
> import of anything com.datastax. I have checked that the jar is listed
> among
> the classpaths and it is, albeit behind a spark URL. I'm wondering if added
> jars fail in windows due to this server abstraction? I can't find anything
> on this already, so maybe it's somehow an environment thing. Has anybody
> encountered this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows-
> 10-tp28442.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Jar not in shell classpath in Windows 10

2017-02-28 Thread Justin Pihony
Run spark-shell --packages
datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11 and then try to do an
import of anything com.datastax. I have checked that the jar is listed among
the classpaths and it is, albeit behind a spark URL. I'm wondering if added
jars fail in windows due to this server abstraction? I can't find anything
on this already, so maybe it's somehow an environment thing. Has anybody
encountered this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Jar-not-in-shell-classpath-in-Windows-10-tp28442.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is there a list of missing optimizations for typed functions?

2017-02-22 Thread Justin Pihony
I was curious if there was introspection of certain typed functions and ran
the following two queries:

ds.where($"col" > 1).explain
ds.filter(_.col > 1).explain

And found that the typed function does NOT result in a PushedFilter. I
imagine this is due to a limited view of the function, so I have two
questions really:

1.) Is there a list of the methods that lose some of the optimizations that
you get from non-functional methods? Is it any method that accepts a generic
function?
2.) Is there any work to attempt reflection and gain some of these
optimizations back? I couldn't find anything in JIRA.

Thanks,
Justin Pihony



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-functions-tp28418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Justin Miller
Interesting, because a bug that seemed to be fixed in 2.1.0-SNAPSHOT doesn't 
appear to be fixed in 2.1.0 stable (it centered around a null-pointer exception 
during code gen). It seems to be fixed in 2.1.1-SNAPSHOT, but I can try stable 
again.

> On Dec 28, 2016, at 1:38 PM, Mark Hamstra <m...@clearstorydata.com> wrote:
> 
> A SNAPSHOT build is not a stable artifact, but rather floats to the top of 
> commits that are intended for the next release.  So, 2.1.1-SNAPSHOT comes 
> after the 2.1.0 release and contains any code at the time that the artifact 
> was built that was committed to the branch-2.1 maintenance branch and is, 
> therefore, intended for the eventual 2.1.1 maintenance release.  Once a 
> release is tagged and stable artifacts for it can be built, there is no 
> purpose for s SNAPSHOT of that release -- e.g. there is no longer any purpose 
> for a 2.1.0-SNAPSHOT release; if you want 2.1.0, then you should be using 
> stable artifacts now, not SNAPSHOTs.
> 
> The existence of a SNAPSHOT doesn't imply anything about the release date of 
> the associated finished version.  Rather, it only indicates a name that is 
> attached to all of the code that is currently intended for the associated 
> release number. 
> 
> On Wed, Dec 28, 2016 at 3:09 PM, Justin Miller <justin.mil...@protectwise.com 
> <mailto:justin.mil...@protectwise.com>> wrote:
> It looks like the jars for 2.1.0-SNAPSHOT are gone?
> 
> https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/
>  
> <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/>
> 
> Also:
> 
> 2.1.0-SNAPSHOT/ 
> <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/>
>   Fri Dec 23 16:31:42 UTC 2016
> 2.1.1-SNAPSHOT/ 
> <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.1-SNAPSHOT/>
>   Wed Dec 28 20:01:10 UTC 2016
> 2.2.0-SNAPSHOT/ 
> <https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.2.0-SNAPSHOT/>
>   Wed Dec 28 19:12:38 UTC 2016 
> 
> What's with 2.1.1-SNAPSHOT? Is that version about to be released as well?
> 
> Thanks!
> Justin
> 
>> On Dec 28, 2016, at 12:53 PM, Mark Hamstra <m...@clearstorydata.com 
>> <mailto:m...@clearstorydata.com>> wrote:
>> 
>> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 
>> <https://github.com/apache/spark/tree/v2.1.0>
>> 
>> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers <ko...@tresata.com 
>> <mailto:ko...@tresata.com>> wrote:
>> seems like the artifacts are on maven central but the website is not yet 
>> updated.
>> 
>> strangely the tag v2.1.0 is not yet available on github. i assume its equal 
>> to v2.1.0-rc5
>> 
>> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller 
>> <justin.mil...@protectwise.com <mailto:justin.mil...@protectwise.com>> wrote:
>> I'm curious about this as well. Seems like the vote passed.
>> 
>> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal <asmbans...@gmail.com 
>> > <mailto:asmbans...@gmail.com>> wrote:
>> >
>> >
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
>> 
> 
> 



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Justin Miller
It looks like the jars for 2.1.0-SNAPSHOT are gone?

https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/
 
<https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/>

Also:

2.1.0-SNAPSHOT/ 
<https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/>
Fri Dec 23 16:31:42 UTC 2016
2.1.1-SNAPSHOT/ 
<https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.1-SNAPSHOT/>
Wed Dec 28 20:01:10 UTC 2016
2.2.0-SNAPSHOT/ 
<https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.2.0-SNAPSHOT/>
Wed Dec 28 19:12:38 UTC 2016 

What's with 2.1.1-SNAPSHOT? Is that version about to be released as well?

Thanks!
Justin

> On Dec 28, 2016, at 12:53 PM, Mark Hamstra <m...@clearstorydata.com> wrote:
> 
> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 
> <https://github.com/apache/spark/tree/v2.1.0>
> 
> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers <ko...@tresata.com 
> <mailto:ko...@tresata.com>> wrote:
> seems like the artifacts are on maven central but the website is not yet 
> updated.
> 
> strangely the tag v2.1.0 is not yet available on github. i assume its equal 
> to v2.1.0-rc5
> 
> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller 
> <justin.mil...@protectwise.com <mailto:justin.mil...@protectwise.com>> wrote:
> I'm curious about this as well. Seems like the vote passed.
> 
> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal <asmbans...@gmail.com 
> > <mailto:asmbans...@gmail.com>> wrote:
> >
> >
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-23 Thread Justin Miller
I'm curious about this as well. Seems like the vote passed. 

> On Dec 23, 2016, at 2:00 AM, Aseem Bansal  wrote:
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



K-Means seems biased to one center

2015-10-05 Thread Justin Pihony
(Cross post with
http://stackoverflow.com/questions/32936380/k-means-clustering-is-biased-to-one-center)


I have a corpus of wiki pages (baseball, hockey, music, football) which I'm
running through tfidf and then through kmeans. After a couple issues to
start (you can see my previous questions), I'm finally getting a
KMeansModel...but
when I try to predict, I keep getting the same center. Is this because of
the small dataset, or because I'm comparing a multi-word document against a
smaller amount of words(1-20) query? Or is there something else I'm doing
wrong? See the below code:

//Preprocessing of data includes splitting into words
//and removing words with only 1 or 2 characters
val corpus: RDD[Seq[String]]
val hashingTF = new HashingTF(10)
val tf = hashingTF.transform(corpus)
val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf).cache
val kMeansModel = KMeans.train(tfidf, 3, 10)

val queryTf = hashingTF.transform(List("music"))
val queryTfidf = idf.transform(queryTf)
kMeansModel.predict(queryTfidf) //Always the same, no matter the term supplied


save checkpoint during dataframe row iteration

2015-10-05 Thread Justin Permar
Good morning,

I have a typical iterator loop on a DataFrame loaded from a parquet data
source:

val conf = new SparkConf().setAppName("Simple
Application").setMaster("local")
val sc = new JavaSparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val parquetDataFrame =
sqlContext.read.parquet(parquetFilename.getAbsolutePath)
parquetDataFrame.foreachPartition {
  rowIterator =>
rowIterator.foreach { row =>
// ... do work
}
}

My use case is quite simple: I would like to save a checkpoint during
processing, and if the driver program fails, skip over the initial records
in the parquet file, and continue from the checkpoint. This would be
analogous to storing a loop iterator value in a standard C++/Java for loop.

My question is: are there any guarantees about the ordering of rows in the
"foreach" closure? Even if there are not guarantees in general (i.e., for
DataFrame from any source), considering that the data frame is created from
a parquet file, are there any guarantees?

Is it possible to implement my use case?

Thanks for your time.


Re: Is MLBase dead?

2015-09-28 Thread Justin Pihony
To take a stab at my own answer: MLBase is now fully integrated into MLLib.
MLI/MLLib are the mllib algorithms and MLO is the ml pipelines?

On Mon, Sep 28, 2015 at 10:19 PM, Justin Pihony <justin.pih...@gmail.com>
wrote:

> As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and
> org.apache.spark.ml? I cannot find anything official, and the last updates
> seem to be a year or two old.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Is MLBase dead?

2015-09-28 Thread Justin Pihony
As in, is MLBase (MLO/MLI/MLlib) now simply org.apache.spark.mllib and
org.apache.spark.ml? I cannot find anything official, and the last updates
seem to be a year or two old.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-MLBase-dead-tp24854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to access Spark UI through AWS

2015-08-25 Thread Justin Pihony
I figured it all out after this:
http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-td21540.html

The short is that I needed to set

SPARK_PUBLIC_DNS (not DNS_HOME) = ec2_publicdns

then

the YARN proxy gets in the way, so I needed to go to:

http://ec2_publicdns:20888/proxy/applicationid/jobs  (9046 is the older
emr port)

or, as Jonathan said, the spark history server works once a job is
completed.

On Tue, Aug 25, 2015 at 5:26 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 OK, I figured the horrid look alsothe href of all of the styles is
 prefixed with the proxy dataso, ultimately if I can fix the proxy
 issues with the links, then I can fix the look also

 On Tue, Aug 25, 2015 at 5:17 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to
 access the spark ui directly. The application proxy was still getting in
 the way by the way it creates the URL, so I manually filled in the
 /stage?id=#attempt=# and that workedI'm still having trouble with the
 css as the UI looks horridbut I'll tackle that next :)

 On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 Thanks. I just tried and still am having trouble. It seems to still be
 using the private address even if I try going through the resource manager.

 On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

 I'm not sure why the UI appears broken like that either and haven't
 investigated it myself yet, but if you instead go to the YARN
 ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for
 3.x,
 I believe), then you should be able to click on the ApplicationMaster
 link
 (or the History link for completed applications) to get to the Spark UI
 from there. The ApplicationMaster link will use the YARN Proxy Service
 (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark
 application's UI, regardless of what port it's running on. For completed
 applications, the History link will send you directly to the Spark
 History
 Server UI on port 18080. Hope that helps!

 ~ Jonathan




 On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote:

 I am using the steps from  this article
 https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
  to
 get spark up and running on EMR through yarn. Once up and running I
 ssh in
 and cd to the spark bin and run spark-shell --master yarn. Once this
 spins
 up I can see that the UI is started at the internal ip of 4040. If I
 hit
 the
 public dns at 4040 with dynamic port tunneling and foxyproxy then I
 get a
 crude UI (css seems broken), however the proxy continuously redirects
 me
 to
 the main page, so I cannot drill into anything. So, I tried static
 tunneling, but can't seem to get through.
 
 So, how can I access the spark UI when running a spark shell in AWS
 yarn?
 
 
 
 --
 View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI
 -through-AWS-tp24436.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 







Re: How to access Spark UI through AWS

2015-08-25 Thread Justin Pihony
Thanks. I just tried and still am having trouble. It seems to still be
using the private address even if I try going through the resource manager.

On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com
wrote:

 I'm not sure why the UI appears broken like that either and haven't
 investigated it myself yet, but if you instead go to the YARN
 ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x,
 I believe), then you should be able to click on the ApplicationMaster link
 (or the History link for completed applications) to get to the Spark UI
 from there. The ApplicationMaster link will use the YARN Proxy Service
 (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark
 application's UI, regardless of what port it's running on. For completed
 applications, the History link will send you directly to the Spark History
 Server UI on port 18080. Hope that helps!

 ~ Jonathan




 On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote:

 I am using the steps from  this article
 https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923   to
 get spark up and running on EMR through yarn. Once up and running I ssh in
 and cd to the spark bin and run spark-shell --master yarn. Once this spins
 up I can see that the UI is started at the internal ip of 4040. If I hit
 the
 public dns at 4040 with dynamic port tunneling and foxyproxy then I get a
 crude UI (css seems broken), however the proxy continuously redirects me
 to
 the main page, so I cannot drill into anything. So, I tried static
 tunneling, but can't seem to get through.
 
 So, how can I access the spark UI when running a spark shell in AWS yarn?
 
 
 
 --
 View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI
 -through-AWS-tp24436.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 




Re: How to access Spark UI through AWS

2015-08-25 Thread Justin Pihony
OK, I figured the horrid look alsothe href of all of the styles is
prefixed with the proxy dataso, ultimately if I can fix the proxy
issues with the links, then I can fix the look also

On Tue, Aug 25, 2015 at 5:17 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to
 access the spark ui directly. The application proxy was still getting in
 the way by the way it creates the URL, so I manually filled in the
 /stage?id=#attempt=# and that workedI'm still having trouble with the
 css as the UI looks horridbut I'll tackle that next :)

 On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 Thanks. I just tried and still am having trouble. It seems to still be
 using the private address even if I try going through the resource manager.

 On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

 I'm not sure why the UI appears broken like that either and haven't
 investigated it myself yet, but if you instead go to the YARN
 ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for
 3.x,
 I believe), then you should be able to click on the ApplicationMaster
 link
 (or the History link for completed applications) to get to the Spark UI
 from there. The ApplicationMaster link will use the YARN Proxy Service
 (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark
 application's UI, regardless of what port it's running on. For completed
 applications, the History link will send you directly to the Spark
 History
 Server UI on port 18080. Hope that helps!

 ~ Jonathan




 On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote:

 I am using the steps from  this article
 https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
  to
 get spark up and running on EMR through yarn. Once up and running I ssh
 in
 and cd to the spark bin and run spark-shell --master yarn. Once this
 spins
 up I can see that the UI is started at the internal ip of 4040. If I hit
 the
 public dns at 4040 with dynamic port tunneling and foxyproxy then I get
 a
 crude UI (css seems broken), however the proxy continuously redirects me
 to
 the main page, so I cannot drill into anything. So, I tried static
 tunneling, but can't seem to get through.
 
 So, how can I access the spark UI when running a spark shell in AWS
 yarn?
 
 
 
 --
 View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI
 -through-AWS-tp24436.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 






Re: How to access Spark UI through AWS

2015-08-25 Thread Justin Pihony
SUCCESS! I set SPARK_DNS_HOME=ec2_publicdns, which makes it available to
access the spark ui directly. The application proxy was still getting in
the way by the way it creates the URL, so I manually filled in the
/stage?id=#attempt=# and that workedI'm still having trouble with the
css as the UI looks horridbut I'll tackle that next :)

On Tue, Aug 25, 2015 at 4:31 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 Thanks. I just tried and still am having trouble. It seems to still be
 using the private address even if I try going through the resource manager.

 On Tue, Aug 25, 2015 at 12:34 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

 I'm not sure why the UI appears broken like that either and haven't
 investigated it myself yet, but if you instead go to the YARN
 ResourceManager UI (port 8088 if you are using emr-4.x; port 9026 for 3.x,
 I believe), then you should be able to click on the ApplicationMaster link
 (or the History link for completed applications) to get to the Spark UI
 from there. The ApplicationMaster link will use the YARN Proxy Service
 (port 20888 on emr-4.x; not sure about 3.x) to proxy through the Spark
 application's UI, regardless of what port it's running on. For completed
 applications, the History link will send you directly to the Spark History
 Server UI on port 18080. Hope that helps!

 ~ Jonathan




 On 8/24/15, 10:51 PM, Justin Pihony justin.pih...@gmail.com wrote:

 I am using the steps from  this article
 https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
  to
 get spark up and running on EMR through yarn. Once up and running I ssh
 in
 and cd to the spark bin and run spark-shell --master yarn. Once this
 spins
 up I can see that the UI is started at the internal ip of 4040. If I hit
 the
 public dns at 4040 with dynamic port tunneling and foxyproxy then I get a
 crude UI (css seems broken), however the proxy continuously redirects me
 to
 the main page, so I cannot drill into anything. So, I tried static
 tunneling, but can't seem to get through.
 
 So, how can I access the spark UI when running a spark shell in AWS yarn?
 
 
 
 --
 View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI
 -through-AWS-tp24436.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 





Re: Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
Additional info...If I use an online md5sum check then it matches...So,
it's either windows or python (using 2.7.10)

On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
 seen this on two different machines. I am running on windows, but I would
 imagine that shouldn't affect the md5. Is this a boto problem, python
 problem, spark problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
I found this solution:
https://stackoverflow.com/questions/3390484/python-hashlib-md5-differs-between-linux-windows

Does anybody see a reason why I shouldn't put in a PR to make this change?

FROM
with open(tgz_file_path) as tar:

TO
with open(tgz_file_path, rb) as tar:

On Mon, Aug 24, 2015 at 11:58 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 Additional info...If I use an online md5sum check then it matches...So,
 it's either windows or python (using 2.7.10)

 On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
 seen this on two different machines. I am running on windows, but I would
 imagine that shouldn't affect the md5. Is this a boto problem, python
 problem, spark problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
seen this on two different machines. I am running on windows, but I would
imagine that shouldn't affect the md5. Is this a boto problem, python
problem, spark problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to access Spark UI through AWS

2015-08-24 Thread Justin Pihony
I am using the steps from  this article
https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923   to
get spark up and running on EMR through yarn. Once up and running I ssh in
and cd to the spark bin and run spark-shell --master yarn. Once this spins
up I can see that the UI is started at the internal ip of 4040. If I hit the
public dns at 4040 with dynamic port tunneling and foxyproxy then I get a
crude UI (css seems broken), however the proxy continuously redirects me to
the main page, so I cannot drill into anything. So, I tried static
tunneling, but can't seem to get through.

So, how can I access the spark UI when running a spark shell in AWS yarn?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI-through-AWS-tp24436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
We are aggregating real time logs of events, and want to do windows of 30
minutes. However, since the computation doesn't start until 30 minutes have
passed, there is a ton of data built up that processing could've already
started on. When it comes time to actually process the data, there is too
much for our cluster to handle at once.

The basic idea is this:

 val mergedMain = mergedStream
  .flatMap(r = ) // denormalize data for this particular output
stream
  .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) //
this would sum over the batches
  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
sumAggregates(x,y), 180, 180) // sum over the windows
  .map(rec = ...) // convert data to other format
  .foreachRDD{ (rdd, time) =
rdd.saveAsTextFile(...) // save to text files
  }

I would want the batches to be reduced as soon as they arrive (the first
reduceByKey), since there isn't any reason to wait. Instead all of the
unprocessed data has to be processed at the same time (this data is being
heavily denormalized in some cases, and so generates a bunch of additional
data).

Thanks for any help.


Re: Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
I tried something like that. When I tried just doing count() on the
DStream, it didn't seem like it was actually forcing the computation.

What (sort of) worked was doing a forEachRDD((rdd) = rdd.count()), or
doing a print() on the DStream. The only problem was this seemed to add a
lot of processing overhead -- I couldn't figure out exactly why but it
seemed to have something to do with forEachRDD only being executed on the
driver.

On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș iulian.dra...@typesafe.com
wrote:

 On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote:

 We are aggregating real time logs of events, and want to do windows of 30
 minutes. However, since the computation doesn't start until 30 minutes have
 passed, there is a ton of data built up that processing could've already
 started on. When it comes time to actually process the data, there is too
 much for our cluster to handle at once.

 The basic idea is this:

  val mergedMain = mergedStream
   .flatMap(r = ) // denormalize data for this particular output
 stream
   .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) //
 this would sum over the batches

 Could you add a dummy action at this point?

 val firstStep = mergedStream
   .flatMap(r = ) // denormalize data for this particular output 
 stream
   .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this 
 would sum over the batches
   .persist() // this will be reused in windowing operations

 firstStep.count() // just to trigger computation

 firstStep
   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = 
 sumAggregates(x,y), 180, 180) // sum over the windows
   .map(rec = ...) // convert data to other format
   .foreachRDD{ (rdd, time) =
 rdd.saveAsTextFile(...) // save to text files
   }

   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
 sumAggregates(x,y), 180, 180) // sum over the windows
   .map(rec = ...) // convert data to other format
   .foreachRDD{ (rdd, time) =
 rdd.saveAsTextFile(...) // save to text files
   }

 I would want the batches to be reduced as soon as they arrive (the first
 reduceByKey), since there isn't any reason to wait. Instead all of the
 unprocessed data has to be processed at the same time (this data is being
 heavily denormalized in some cases, and so generates a bunch of additional
 data).

 Thanks for any help.

 ​
 --

 --
 Iulian Dragos

 --
 Reactive Apps on the JVM
 www.typesafe.com




Spark Python process

2015-06-24 Thread Justin Steigel
I have a spark job that's running on a 10 node cluster and the python
process on all the nodes is pegged at 100%.

I was wondering what parts of a spark script are run in the python process
and which get passed to the Java processes?  Is there any documentation on
this?

Thanks,
Justin


NaiveBayes for MLPipeline is absent

2015-06-18 Thread Justin Yip
Hello,

Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't
find the JIRA ticket related to it too (or maybe I missed).

Is there a plan to implement it? If no one has the bandwidth, I can work on
it.

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NaiveBayes-for-MLPipeline-is-absent-tp23402.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0

2015-06-17 Thread Justin Yip
Done.

https://issues.apache.org/jira/browse/SPARK-8420

Justin

On Wed, Jun 17, 2015 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote:

 That sounds like a bug. Could you create a JIRA and ping Yin Huai
 (cc'ed). -Xiangrui

 On Wed, May 27, 2015 at 12:57 AM, Justin Yip yipjus...@prediction.io
 wrote:
  Hello,
 
  I am trying out 1.4.0 and notice there are some differences in behavior
 with
  Timestamp between 1.3.1 and 1.4.0.
 
  In 1.3.1, I can compare a Timestamp with string.
  scala val df = sqlContext.createDataFrame(Seq((1,
  Timestamp.valueOf(2015-01-01 00:00:00)), (2,
 Timestamp.valueOf(2014-01-01
  00:00:00
  ...
  scala df.filter($_2 = 2014-06-01).show
  ...
  _1 _2
  2  2014-01-01 00:00:...
 
  However, in 1.4.0, the filter is always false:
  scala val df = sqlContext.createDataFrame(Seq((1,
  Timestamp.valueOf(2015-01-01 00:00:00)), (2,
 Timestamp.valueOf(2014-01-01
  00:00:00
  df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp]
 
  scala df.filter($_2 = 2014-06-01).show
  +--+--+
  |_1|_2|
  +--+--+
  +--+--+
 
  Not sure if that is intended, but I cannot find any doc mentioning these
  inconsistencies.
 
  Thanks.
 
  Justin
 
  
  View this message in context: Inconsistent behavior with Dataframe
 Timestamp
  between 1.3.1 and 1.4.0
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



NullPointerException with functions.rand()

2015-06-10 Thread Justin Yip
Hello,

I am using 1.4.0 and found the following weird behavior.

This case works fine:

scala sc.parallelize(Seq((1,2), (3, 100))).toDF.withColumn(index,
rand(30)).show()
+--+---+---+
|_1| _2|  index|
+--+---+---+
| 1|  2| 0.6662967911724369|
| 3|100|0.35734504984676396|
+--+---+---+

However, when I use sqlContext.createDataFrame instead, I get a NPE:

scala sqlContext.createDataFrame(Seq((1,2), (3, 100))).withColumn(index,
rand(30)).show()
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.RDG.rng$lzycompute(random.scala:39)
at org.apache.spark.sql.catalyst.expressions.RDG.rng(random.scala:39)
..


Does any one know why?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-with-functions-rand-tp23267.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Why the default Params.copy doesn't work for Model.copy?

2015-06-04 Thread Justin Yip
Hello,

I have a question with Spark 1.4 ml library. In the copy function, it is
stated that the default implementation doesn't work of Params doesn't work
for models. (
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Model.scala#L49
)

As a result, some feature generation transformer like StringIndexerModel
cannot be used in Pipeline.

Maybe due to my limited knowledge in ML pipeline, can anyone give me some
hints why Model.copy behaves differently as other Params?

Thanks!

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-default-Params-copy-doesn-t-work-for-Model-copy-tp23169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Setting S3 output file grantees for spark output files

2015-06-04 Thread Justin Steigel
Hi all,

I'm running Spark on AWS EMR and I'm having some issues getting the correct
permissions on the output files using
rdd.saveAsTextFile('file_dir_name').  In hive, I would add a line in the
beginning of the script with

set fs.s3.canned.acl=BucketOwnerFullControl

and that would set the correct grantees for the files. For Spark, I tried
adding the permissions as a --conf option:

hadoop jar /mnt/var/lib/hadoop/steps/s-3HIRLHJJXV3SJ/script-runner.jar \
/home/hadoop/spark/bin/spark-submit --deploy-mode cluster --master
yarn-cluster \
--conf spark.driver.extraJavaOptions
-Dfs.s3.canned.acl=BucketOwnerFullControl \
hdfs:///user/hadoop/spark.py

But the permissions do not get set properly on the output files. What is
the proper way to pass in the 'fs.s3.canned.acl=BucketOwnerFullControl' or
any of the S3 canned permissions to the spark job?

Thanks in advance


Python Image Library and Spark

2015-06-03 Thread Justin Spargur
Hi all,

 I'm playing around with manipulating images via Python and want to
utilize Spark for scalability. That said, I'm just learing Spark and my
Python is a bit rusty (been doing PHP coding for the last few years). I
think I have most of the process figured out. However, the script fails on
larger images and Spark is sending out the following warning for smaller
images:

Stage 0 contains a task of very large size (1151 KB). The maximum
recommended task size is 100 KB.

My code is as follows:

import Image
from pyspark import SparkContext

if __name__ == __main__:

imageFile = sample.jpg
outFile   = sample.gray.jpg

sc = SparkContext(appName=Grayscale)
im = Image.open(imageFile)

# Create an RDD for the data from the image file
img_data = sc.parallelize( list(im.getdata()) )

# Create an RDD for the grayscale value
gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07)
)

# Put our grayscale value into the RGR channels
grayscale = gValue.map( lambda x: (x,x,x)  )

# Save the output in a new image.
im.putdata( grayscale.collect() )

im.save(outFile)

Obviously, something is amiss. However, I can't figure out where I'm off
track with this. Any help is appreciated! Thanks in advance!!!


Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0

2015-05-27 Thread Justin Yip
Hello,

I am trying out 1.4.0 and notice there are some differences in behavior
with Timestamp between 1.3.1 and 1.4.0.

In 1.3.1, I can compare a Timestamp with string.
scala val df = sqlContext.createDataFrame(Seq((1,
Timestamp.valueOf(2015-01-01 00:00:00)), (2,
Timestamp.valueOf(2014-01-01 00:00:00
...
scala df.filter($_2 = 2014-06-01).show
...
_1 _2
2  2014-01-01 00:00:...

However, in 1.4.0, the filter is always false:
scala val df = sqlContext.createDataFrame(Seq((1,
Timestamp.valueOf(2015-01-01 00:00:00)), (2,
Timestamp.valueOf(2014-01-01 00:00:00
df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp]

scala df.filter($_2 = 2014-06-01).show
+--+--+
|_1|_2|
+--+--+
+--+--+

Not sure if that is intended, but I cannot find any doc mentioning these
inconsistencies.

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-behavior-with-Dataframe-Timestamp-between-1-3-1-and-1-4-0-tp23045.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Accumulators in Spark Streaming on UI

2015-05-26 Thread Justin Pihony
You need to make sure to name the accumulator.

On Tue, May 26, 2015 at 2:23 PM, Snehal Nagmote nagmote.sne...@gmail.com
wrote:

 Hello all,

 I have  accumulator in spark streaming application which counts number of
 events received from Kafka.

 From the documentation , It seems Spark UI has support to display it .

 But I am unable to see it on UI. I am using spark 1.3.1

 Do I need to call any method (print)  or am I missing something ?


 Thanks in advance,

 Snehal





Building scaladoc using build/sbt unidoc failure

2015-05-26 Thread Justin Yip
Hello,

I am trying to build scala doc from the 1.4 branch. But it failed due to
[error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
List(object package$DebugNode, object package$DebugNode)

I followed the instruction on github
https://github.com/apache/spark/tree/branch-1.4/docs and used the
following command:

$ build/sbt unidoc

Please see attachment for detailed error. Did I miss anything?

Thanks.

Justin


unidoc_error.txt (30K) 
http://apache-spark-user-list.1001560.n3.nabble.com/attachment/23044/0/unidoc_error.txt




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-scaladoc-using-build-sbt-unidoc-failure-tp23044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
The (crude) proof of concept seems to work:

class RDD[V](value: List[V]){
  def doStuff = println(I'm doing stuff)
}

object RDD{
  implicit def toPair[V](x:RDD[V]) = new PairRDD(List((1,2)))
}

class PairRDD[K,V](value: List[(K,V)]) extends RDD (value){
  def doPairs = println(I'm using pairs)
}

class Context{
  def parallelize[K,V](x: List[(K,V)]) = new PairRDD(x)
  def parallelize[V](x: List[V]) = new RDD(x)
}

On Fri, May 22, 2015 at 2:44 PM, Reynold Xin r...@databricks.com wrote:

 I'm not sure if it is possible to overload the map function twice, once
 for just KV pairs, and another for K and V separately.


 On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved
 the RDD API, but it could be even more discoverable if made available via
 the API directly. I assume this was originally an omission that now needs
 to be kept for backwards compatibility, but would any of the repo owners be
 open to making this more discoverable to the point of API docs and tab
 completion (while keeping both binary and source compatibility)?


 class PairRDD extends RDD{
   pair methods
 }

 RDD{
   def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
 }

 As long as the implicits remain, then compatibility remains, but now it
 is explicit in the docs on how to get a PairRDD and in tab completion.

 Thoughts?

 Justin Pihony





Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the
RDD API, but it could be even more discoverable if made available via the
API directly. I assume this was originally an omission that now needs to be
kept for backwards compatibility, but would any of the repo owners be open
to making this more discoverable to the point of API docs and tab
completion (while keeping both binary and source compatibility)?


class PairRDD extends RDD{
  pair methods
}

RDD{
  def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
}

As long as the implicits remain, then compatibility remains, but now it is
explicit in the docs on how to get a PairRDD and in tab completion.

Thoughts?

Justin Pihony


Re: User Defined Type (UDT)

2015-05-20 Thread Justin Uang
Xiangrui, is there a timeline for when UDTs will become a public API? I'm
currently using them to support java 8's ZonedDateTime.

On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng men...@gmail.com wrote:

 (Note that UDT is not a public API yet.)

 On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote:
  Hi all!
 
  I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
 for
  a project I'm working on. I've created a case class Person(name: String)
 and
  now I'm trying to make Spark to be able serialize and deserialize the
  defined type. I made a couple of attempts but none of them did not work
 in
  100% (there were issues either in serialization or deserialization).
 
  This is my class and the corresponding UDT.
 
  @SQLUserDefinedType(udt = classOf[PersonUDT])
  case class Person(name: String)
 
  class PersonUDT extends UserDefinedType[Person] {
override def sqlType: DataType = StructType(Seq(StructField(name,
  StringType)))
 
override def serialize(obj: Any): Seq[Any] = {

 This should return a Row instance instead of Seq[Any], because the
 sqlType is a struct type.

  obj match {
case c: Person =
  Seq(c.name)
  }
}
 
override def userClass: Class[Person] = classOf[Person]
 
override def deserialize(datum: Any): Person = {
  datum match {
case values: Seq[_] =
  assert(values.length == 1)
  Person(values.head.asInstanceOf[String])
case values: util.ArrayList[_] =
  Person(values.get(0).asInstanceOf[String])
  }
}
 
// In some other attempt I was creating RDD of Seq with manually
  serialized data and
// I had to override equals because two DFs with the same type weren't
  actually equal
// StructField(person,...types.PersonUDT@a096ac3)
// StructField(person,...types.PersonUDT@613fd937)
def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
 
override def equals(other: Any): Boolean = other match {
  case that: PersonUDT = true
  case _ = false
}
 
override def hashCode(): Int = 1
  }
 
  This is how I create RDD of Person and then try to create a DataFrame
  val rdd = sparkContext.parallelize((1 to 100).map(i =
 Person(i.toString)))
  val sparkDataFrame = sqlContext.createDataFrame(rdd)
 
  The second line throws an exception:
  java.lang.ClassCastException: types.PersonUDT cannot be cast to
  org.apache.spark.sql.types.StructType
  at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
 
  I looked into the code in SQLContext.scala and it seems that the code
  requires UDT to be extending StructType but in fact it extends
  UserDefinedType which extends directly DataType.
  I'm not sure whether it is a bug or I just don't know how to use UDTs.
 
  Do you have any suggestions how to solve this? I based my UDT on
  ExamplePointUDT but it seems to be incorrect. Is there a working example
 for
  UDT?
 
 
  Thank you for the reply in advance!
  wjur
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark logo license

2015-05-19 Thread Justin Pihony
Thanks!

On Wed, May 20, 2015 at 12:41 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Check out Apache's trademark guidelines here:
 http://www.apache.org/foundation/marks/

 Matei

 On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 What is the license on using the spark logo. Is it free to be used for
 displaying commercially?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Spark logo license

2015-05-19 Thread Justin Pihony
What is the license on using the spark logo. Is it free to be used for
displaying commercially?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Windows DOS bug in windows-utils.cmd

2015-05-19 Thread Justin Pihony
When running something like this:

spark-shell --jars foo.jar,bar.jar

This keeps failing to include the tail of the jars list. Digging into the
launch scripts I found that the comma makes it so that the list was sent as
separate parameters. So, to keep things together, I tried 

spark-shell --jars foo.jar, bar.jar

But, this still failed as the quotes carried over into some of the string
checks and resulted in invalid character errors. So, I am curious if anybody
sees a problem with making a PR to fix the script from

...
if x%2==x (
  echo %1 requires an argument. 2
  exit /b 1
)
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2
...

TO

...
if x%~2==x (
  echo %1 requires an argument. 2
  exit /b 1
)
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %~2
...

The only difference is the use of the tilde to remove any surrounding quotes
if there are some. 

I figured I would ask here first to vet any unforeseen bugs this might cause
in other systems. As far as I know this should be harmless and only make it
so that comma separated lists will work in DOS.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Windows-DOS-bug-in-windows-utils-cmd-tp22946.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I am trying to print a basic twitter stream and receiving the following
error:


15/05/18 22:03:14 INFO Executor: Fetching
http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
timestamp 1432000973058
15/05/18 22:03:14 INFO Utils: Fetching
http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
C:\Users\Justin\AppData\Local\Temp\spark-4a37d3
e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
va:715)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)


Code is:

spark-shell --jars
\Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming._

System.setProperty(twitter4j.oauth.consumerKey,*)
System.setProperty(twitter4j.oauth.consumerSecret,*)
System.setProperty(twitter4j.oauth.accessToken,*)
System.setProperty(twitter4j.oauth.accessTokenSecret,*)

val ssc = new StreamingContext(sc, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None)
stream.print
ssc.start


This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
a+x) but Im not sure why...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I think I found the answer -
http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html

Do I have no way of running this in Windows locally?


On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I'm not 100% sure that is causing a problem, though. The stream still
 starts, but is giving blank output. I checked the environment variables in
 the ui and it is running local[*], so there should be no bottleneck there.

 On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I am trying to print a basic twitter stream and receiving the following
 error:


 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3

 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Code is:

 spark-shell --jars

 \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

 import org.apache.spark.streaming.twitter._
 import org.apache.spark.streaming._

 System.setProperty(twitter4j.oauth.consumerKey,*)
 System.setProperty(twitter4j.oauth.consumerSecret,*)
 System.setProperty(twitter4j.oauth.accessToken,*)
 System.setProperty(twitter4j.oauth.accessTokenSecret,*)

 val ssc = new StreamingContext(sc, Seconds(10))
 val stream = TwitterUtils.createStream(ssc, None)
 stream.print
 ssc.start


 This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
 a+x) but Im not sure why...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: TwitterUtils on Windows

2015-05-18 Thread Justin Pihony
I'm not 100% sure that is causing a problem, though. The stream still
starts, but is giving blank output. I checked the environment variables in
the ui and it is running local[*], so there should be no bottleneck there.

On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I am trying to print a basic twitter stream and receiving the following
 error:


 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3

 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
 0)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Code is:

 spark-shell --jars

 \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

 import org.apache.spark.streaming.twitter._
 import org.apache.spark.streaming._

 System.setProperty(twitter4j.oauth.consumerKey,*)
 System.setProperty(twitter4j.oauth.consumerSecret,*)
 System.setProperty(twitter4j.oauth.accessToken,*)
 System.setProperty(twitter4j.oauth.accessTokenSecret,*)

 val ssc = new StreamingContext(sc, Seconds(10))
 val stream = TwitterUtils.createStream(ssc, None)
 stream.print
 ssc.start


 This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath,
 a+x) but Im not sure why...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Trying to understand sc.textFile better

2015-05-17 Thread Justin Pihony
All,
I am trying to understand the textFile method deeply, but I think my
lack of deep Hadoop knowledge is holding me back here. Let me lay out my
understanding and maybe you can correct anything that is incorrect

When sc.textFile(path) is called, then defaultMinPartitions is used,
which is really just math.min(taskScheduler.defaultParallelism, 2). Let's
assume we are using the SparkDeploySchedulerBackend and this is 
conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(),
2))
So, now let's say the default is 2, going back to the textFile, this is
passed in to HadoopRDD. The true size is determined in getPartitions() using
inputFormat.getSplits(jobConf, minPartitions). But, from what I can find,
the partitions is merely a hint and is in fact mostly ignored, so you will
probably get the total number of blocks.
OK, this fits with expectations, however what if the default is not used and
you provide a partition size that is larger than the block size. If my
research is right and the getSplits call simply ignores this parameter, then
wouldn't the provided min end up being ignored and you would still just get
the block size?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-understand-sc-textFile-better-tp22924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting the best parameter set back from CrossValidatorModel

2015-05-16 Thread Justin Yip
Hello,

I am using MLPipeline. I would like to extract the best parameter found by
CrossValidator. But I cannot find much document about how to do it. Can
anyone give me some pointers?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-best-parameter-set-back-from-CrossValidatorModel-tp22915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Best practice to avoid ambiguous columns in DataFrame.join

2015-05-15 Thread Justin Yip
Hello,

I would like ask know if there are recommended ways of preventing ambiguous
columns when joining dataframes. When we join dataframes, it usually happen
we join the column with identical name. I could have rename the columns on
the right data frame, as described in the following code. Is there a better
way to achieve this?

scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3,
b), (4, b)))
df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30),
(4, 40)))
df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

scala df.join(df2.withColumnRenamed(_1, right_key), $_1 ===
$right_key).printSchema

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Custom Aggregate Function for DataFrame

2015-05-15 Thread Justin Yip
Hi Ayan,

I have a DF constructed from the following case class Event:

case class State { attr1: String, }

case class Event {
  userId: String,
  time: Long,
  state: State
}

I would like to generate a DF which contains the latest state of each
userId. I could have first compute the latest time of each user, and join
it back to the original data frame. But that involves two shuffles. Hence
would like to see if there are ways to improve the performance.

Thanks.

Justin


On Fri, May 15, 2015 at 6:32 AM, ayan guha guha.a...@gmail.com wrote:

 can you kindly elaborate on this? it should be possible to write udafs in
 similar lines of sum/min etc.

 On Fri, May 15, 2015 at 5:49 AM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 May I know if these is way to implement aggregate function for grouped
 data in DataFrame? I dug into the doc but didn't find any apart from the
 UDF functions which applies on a Row. Maybe I have missed something. Thanks.

 Justin

 --
 View this message in context: Custom Aggregate Function for DataFrame
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




 --
 Best Regards,
 Ayan Guha



Re: Best practice to avoid ambiguous columns in DataFrame.join

2015-05-15 Thread Justin Yip
Thanks Michael,

This is very helpful. I have a follow up question related to NaFunctions.
Usually after a left outer join, we get lots of null value and we need to
handle them before further processing. I have the following piece of code,
the _1 column is duplicated and crashes the .na.fill functions.

From your answer, it appears that Spark 1.4 resolves this issue as only a
single _1 column is outputted. You know if there is a good workaround for
Spark 1.3?

scala df3.show
_1 a
1  a
2  b
3  b
4  b

scala df4.show
_1 b
1  10
2  null
3  3
4  0

scala df3.join(df4, df3(_1) === df4(_1)).na.fill(-999)
org.apache.spark.sql.AnalysisException: Reference '_1' is ambiguous, could
be: _1#33, _1#31.;
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:229)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:128)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161)
...

Thanks!

Justin



On Fri, May 15, 2015 at 3:55 PM, Michael Armbrust mich...@databricks.com
wrote:

 There are several ways to solve this ambiguity:

 *1. use the DataFrames to get the attribute so its already resolved and
 not just a string we need to map to a DataFrame.*

 df.join(df2, df(_1) === df2(_1))

 *2. Use aliases*

 df.as('a).join(df2.as('b), $a._1 === $b._1)

 *3. rename the columns as you suggested.*

 df.join(df2.withColumnRenamed(_1, right_key), $_1 ===
 $right_key).printSchema

 *4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String):
 DataFrame*

 df.join(df1, _1)

 This has the added benefit of only outputting a single _1 column.

 On Fri, May 15, 2015 at 3:44 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I would like ask know if there are recommended ways of preventing
 ambiguous columns when joining dataframes. When we join dataframes, it
 usually happen we join the column with identical name. I could have rename
 the columns on the right data frame, as described in the following code. Is
 there a better way to achieve this?

 scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3,
 b), (4, b)))
 df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

 scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3,
 30), (4, 40)))
 df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

 scala df.join(df2.withColumnRenamed(_1, right_key), $_1 ===
 $right_key).printSchema

 Thanks.

 Justin

 --
 View this message in context: Best practice to avoid ambiguous columns
 in DataFrame.join
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Custom Aggregate Function for DataFrame

2015-05-14 Thread Justin Yip
Hello,

May I know if these is way to implement aggregate function for grouped data
in DataFrame? I dug into the doc but didn't find any apart from the UDF
functions which applies on a Row. Maybe I have missed something. Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Creating StructType with DataFrame.withColumn

2015-04-30 Thread Justin Yip
Hello,

I would like to add a StructType to DataFrame. What would be the best way
to do it? Not sure if it is possible using withColumn. A possible way is to
convert the dataframe into a RDD[Row], add the struct and then convert it
back to dataframe. But that seems an overkill.

I guess I may have missed something crucial. Can anyone give me some
pointers?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-StructType-with-DataFrame-withColumn-tp22715.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: casting timestamp into long fail in Spark 1.3.1

2015-04-30 Thread Justin Yip
After some trial and error, using DataType solves the problem:

df.withColumn(millis, $eventTime.cast(
org.apache.spark.sql.types.LongType) * 1000)

Justin

On Thu, Apr 30, 2015 at 3:41 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I was able to cast a timestamp into long using
 df.withColumn(millis, $eventTime.cast(long) * 1000)
 in spark 1.3.0.

 However, this statement returns a failure with spark 1.3.1. I got the
 following exception:

 Exception in thread main org.apache.spark.sql.types.DataTypeException:
 Unsupported dataType: long. If you have a struct and a field name of it has
 any special characters, please use backticks (`) to quote that field name,
 e.g. `x+y`. Please note that backtick itself is not supported in a field
 name.

 at
 org.apache.spark.sql.types.DataTypeParser$class.toDataType(DataTypeParser.scala:95)

 at
 org.apache.spark.sql.types.DataTypeParser$$anon$1.toDataType(DataTypeParser.scala:107)

 at
 org.apache.spark.sql.types.DataTypeParser$.apply(DataTypeParser.scala:111)

 at org.apache.spark.sql.Column.cast(Column.scala:636)

 Is there any change in the casting logic which may lead to this failure?

 Thanks.

 Justin

 --
 View this message in context: casting timestamp into long fail in Spark
 1.3.1
 http://apache-spark-user-list.1001560.n3.nabble.com/casting-timestamp-into-long-fail-in-Spark-1-3-1-tp22727.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



casting timestamp into long fail in Spark 1.3.1

2015-04-30 Thread Justin Yip
Hello,

I was able to cast a timestamp into long using
df.withColumn(millis, $eventTime.cast(long) * 1000)
in spark 1.3.0.

However, this statement returns a failure with spark 1.3.1. I got the
following exception:

Exception in thread main org.apache.spark.sql.types.DataTypeException:
Unsupported dataType: long. If you have a struct and a field name of it has
any special characters, please use backticks (`) to quote that field name,
e.g. `x+y`. Please note that backtick itself is not supported in a field
name.

at
org.apache.spark.sql.types.DataTypeParser$class.toDataType(DataTypeParser.scala:95)

at
org.apache.spark.sql.types.DataTypeParser$$anon$1.toDataType(DataTypeParser.scala:107)

at
org.apache.spark.sql.types.DataTypeParser$.apply(DataTypeParser.scala:111)

at org.apache.spark.sql.Column.cast(Column.scala:636)

Is there any change in the casting logic which may lead to this failure?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/casting-timestamp-into-long-fail-in-Spark-1-3-1-tp22727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Powered By Spark

2015-04-27 Thread Justin
Hi,

Would you mind adding our company to the Powered By Spark page?

organization name: atp
URL: https://atp.io
a list of which Spark components you are using: SparkSQL, MLLib, Databricks
Cloud
and a short description of your use case: Predictive models and
learning algorithms to improve the relevance of programmatic marketing


Thanks!
Justin




Justin Barton
CTO
+1 (718) 404 9272
+44 203 290 9272
atp.io | jus...@atp.io | find us https://atp.io/find-us


Column renaming after DataFrame.groupBy

2015-04-21 Thread Justin Yip
Hello,

I would like rename a column after aggregation. In the following code, the
column name is SUM(_1#179), is there a way to rename it to a more
friendly name?

scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
scala d.groupBy(_1).sum().printSchema
root
 |-- _1: integer (nullable = false)
 |-- SUM(_1#179): long (nullable = true)
 |-- SUM(_2#180): long (nullable = true)

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Expected behavior for DataFrame.unionAll

2015-04-14 Thread Justin Yip
That explains it. Thanks Reynold.

Justin

On Mon, Apr 13, 2015 at 11:26 PM, Reynold Xin r...@databricks.com wrote:

 I think what happened was applying the narrowest possible type. Type
 widening is required, and as a result, the narrowest type is string between
 a string and an int.


 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144



 On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I am experimenting with DataFrame. I tried to construct two DataFrames
 with:
 1. case class A(a: Int, b: String)
 scala adf.printSchema()
 root
  |-- a: integer (nullable = false)
  |-- b: string (nullable = true)

 2. case class B(a: String, c: Int)
 scala bdf.printSchema()
 root
  |-- a: string (nullable = true)
  |-- c: integer (nullable = false)


 Then I unioned the these two DataFrame with the unionAll function, and I
 get the following schema. It is kind of a mixture of A and B.

 scala val udf = adf.unionAll(bdf)
 scala udf.printSchema()
 root
  |-- a: string (nullable = false)
  |-- b: string (nullable = true)

 The unionAll documentation says it behaves like the SQL UNION ALL
 function. However, unioning incompatible types is not well defined for SQL.
 Is there any expected behavior for unioning incompatible data frames?

 Thanks.

 Justin





Catching executor exception from executor in driver

2015-04-14 Thread Justin Yip
Hello,

I would like to know if there is a way of catching exception throw from
executor exception from the driver program. Here is an example:

try {
  val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect
} catch {
  case e: SparkException = {
println(sERROR: $e)
println(sCAUSE: ${e.getCause})
  }
}

Output:
ERROR: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in
stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal):
java.lang.ArithmeticException: / by zero
at
$line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51)
...
CAUSE: null

The exception cause is a null value. Is there any way that I can catch the
ArithmeticException?

Thanks

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Fwd: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Justin Yip
Hello,

I am experimenting with DataFrame. I tried to construct two DataFrames with:
1. case class A(a: Int, b: String)
scala adf.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

2. case class B(a: String, c: Int)
scala bdf.printSchema()
root
 |-- a: string (nullable = true)
 |-- c: integer (nullable = false)


Then I unioned the these two DataFrame with the unionAll function, and I
get the following schema. It is kind of a mixture of A and B.

scala val udf = adf.unionAll(bdf)
scala udf.printSchema()
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)

The unionAll documentation says it behaves like the SQL UNION ALL function.
However, unioning incompatible types is not well defined for SQL. Is there
any expected behavior for unioning incompatible data frames?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Expected-behavior-for-DataFrame-unionAll-tp22487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use Joda Time with Spark SQL?

2015-04-12 Thread Justin Yip
Cheng, this is great info. I have a follow up question. There are a few
very common data types (i.e. Joda DateTime) that is not directly supported
by SparkSQL. Do you know if there are any plans for accommodating some
common data types in SparkSQL? They don't need to be a first class
datatype, but if they are available as UDT and provided by the SparkSQL
library, that will make DataFrame users' life easier.

Justin

On Sat, Apr 11, 2015 at 5:41 AM, Cheng Lian lian.cs@gmail.com wrote:

 One possible approach can be defining a UDT (user-defined type) for Joda
 time. A UDT maps an arbitrary type to and from Spark SQL data types. You
 may check the ExamplePointUDT [1] for more details.

 [1]: https://github.com/apache/spark/blob/694aef0d71d2683eaf63cbd1d8e95c
 2da423b72e/sql/core/src/main/scala/org/apache/spark/sql/
 test/ExamplePointUDT.scala


 On 4/8/15 6:09 AM, adamgerst wrote:

 I've been using Joda Time in all my spark jobs (by using the nscala-time
 package) and have not run into any issues until I started trying to use
 spark sql.  When I try to convert a case class that has a
 com.github.nscala_time.time.Imports.DateTime object in it, an exception
 is
 thrown for with a MatchError

 My assumption is that this is because the basic types of spark sql are
 java.sql.Timestamp and java.sql.Date and therefor spark doesn't know what
 to
 do about the DateTime value.

 How can I get around this? I would prefer not to have to change my code to
 make the values be Timestamps but I'm concerned that might be the only
 way.
 Would something like implicit conversions work here?

 It seems that even if I specify the schema manually then I would still
 have
 the issue since you have to specify the column type which has to be of
 type
 org.apache.spark.sql.types.DataType



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/How-to-use-Joda-Time-with-Spark-SQL-tp22415.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




The $ notation for DataFrame Column

2015-04-10 Thread Justin Yip
Hello,

The DataFrame documentation always uses $columnX to annotates a column.
But I cannot find much information about it. Maybe I have missed something.
Can anyone point me to the doc about the $, if there is any?

Thanks.

Justin


DataFrame column name restriction

2015-04-10 Thread Justin Yip
Hello,

Are there any restriction in the column name? I tried to use ., but
sqlContext.sql cannot find the column. I would guess that . is tricky as
this affects accessing StructType, but are there any more restriction on
column name?

scala case class A(a: Int)
defined class A

scala sqlContext.createDataFrame(Seq(A(10), A(20))).withColumn(b.b, $a
+ 1)
res19: org.apache.spark.sql.DataFrame = [a: int, b.b: int]

scala res19.registerTempTable(res19)

scala res19.select(a)
res24: org.apache.spark.sql.DataFrame = [a: int]

scala res19.select(a, b.b)
org.apache.spark.sql.AnalysisException: cannot resolve 'b.b' given input
columns a, b.b;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



Thanks.

Justin


Re: DataFrame groupBy MapType

2015-04-07 Thread Justin Yip
Thanks Michael. Will submit a ticket.

Justin

On Mon, Apr 6, 2015 at 1:53 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'll add that I don't think there is a convenient way to do this in the
 Column API ATM, but would welcome a JIRA for adding it :)

 On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com
 wrote:

 In HiveQL, you should be able to express this as:

 SELECT ... FROM table GROUP BY m['SomeKey']

 On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I have a case class like this:

 case class A(
   m: Map[Long, Long],
   ...
 )

 and constructed a DataFrame from Seq[A].

 I would like to perform a groupBy on A.m(SomeKey). I can implement a
 UDF, create a new Column then invoke a groupBy on the new Column. But is it
 the idiomatic way of doing such operation?

 Can't find much info about operating MapType on Column in the doc.

 Thanks ahead!

 Justin






Expected behavior for DataFrame.unionAll

2015-04-07 Thread Justin Yip
Hello,

I am experimenting with DataFrame. I tried to construct two DataFrames with:
1. case class A(a: Int, b: String)
scala adf.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

2. case class B(a: String, c: Int)
scala bdf.printSchema()
root
 |-- a: string (nullable = true)
 |-- c: integer (nullable = false)


Then I unioned the these two DataFrame with the unionAll function, and I
get the following schema. It is kind of a mixture of A and B.

scala val udf = adf.unionAll(bdf)
scala udf.printSchema()
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)

The unionAll documentation says it behaves like the SQL UNION ALL function.
However, unioning incompatible types is not well defined for SQL. Is there
any expected behavior for unioning incompatible data frames?

Thanks.

Justin


DataFrame degraded performance after DataFrame.cache

2015-04-07 Thread Justin Yip
Hello,

I have a parquet file of around 55M rows (~ 1G on disk). Performing simple
grouping operation is pretty efficient (I get results within 10 seconds).
However, after called DataFrame.cache, I observe a significant performance
degrade, the same operation now takes 3+ minutes.

My hunch is that DataFrame cannot leverage its columnar format after
persisting in memory. But cannot find anywhere from the doc mentioning this.

Did I miss anything?

Thanks!

Justin


Re: DataFrame degraded performance after DataFrame.cache

2015-04-07 Thread Justin Yip
The schema has a StructType.

Justin

On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote:

 Hi Justin,

 Does the schema of your data have any decimal, array, map, or struct type?

 Thanks,

 Yin

 On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I have a parquet file of around 55M rows (~ 1G on disk). Performing
 simple grouping operation is pretty efficient (I get results within 10
 seconds). However, after called DataFrame.cache, I observe a significant
 performance degrade, the same operation now takes 3+ minutes.

 My hunch is that DataFrame cannot leverage its columnar format after
 persisting in memory. But cannot find anywhere from the doc mentioning this.

 Did I miss anything?

 Thanks!

 Justin





Re: DataFrame degraded performance after DataFrame.cache

2015-04-07 Thread Justin Yip
Thanks for the explanation Yin.

Justin

On Tue, Apr 7, 2015 at 7:36 PM, Yin Huai yh...@databricks.com wrote:

 I think the slowness is caused by the way that we serialize/deserialize
 the value of a complex type. I have opened
 https://issues.apache.org/jira/browse/SPARK-6759 to track the improvement.

 On Tue, Apr 7, 2015 at 6:59 PM, Justin Yip yipjus...@prediction.io
 wrote:

 The schema has a StructType.

 Justin

 On Tue, Apr 7, 2015 at 6:58 PM, Yin Huai yh...@databricks.com wrote:

 Hi Justin,

 Does the schema of your data have any decimal, array, map, or struct
 type?

 Thanks,

 Yin

 On Tue, Apr 7, 2015 at 6:31 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I have a parquet file of around 55M rows (~ 1G on disk). Performing
 simple grouping operation is pretty efficient (I get results within 10
 seconds). However, after called DataFrame.cache, I observe a significant
 performance degrade, the same operation now takes 3+ minutes.

 My hunch is that DataFrame cannot leverage its columnar format after
 persisting in memory. But cannot find anywhere from the doc mentioning 
 this.

 Did I miss anything?

 Thanks!

 Justin







DataFrame groupBy MapType

2015-04-04 Thread Justin Yip
Hello,

I have a case class like this:

case class A(
  m: Map[Long, Long],
  ...
)

and constructed a DataFrame from Seq[A].

I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF,
create a new Column then invoke a groupBy on the new Column. But is it the
idiomatic way of doing such operation?

Can't find much info about operating MapType on Column in the doc.

Thanks ahead!

Justin


Re: MLlib: save models to HDFS?

2015-04-03 Thread Justin Yip
Hello Zhou,

You can look at the recommendation template
http://templates.prediction.io/PredictionIO/template-scala-parallel-recommendation
of PredictionIO. PredictionIO is built on the top of spark. And this
template illustrates how you can save the ALS model to HDFS and the reload
it later.

Justin


On Fri, Apr 3, 2015 at 9:16 AM, S. Zhou myx...@yahoo.com.invalid wrote:

 I am new to MLib so I have a basic question: is it possible to save MLlib
 models (particularly CF models) to HDFS and then reload it later? If yes,
 could u share some sample code (I could not find it in MLlib tutorial).
 Thanks!



Re: StackOverflow Problem with 1.3 mllib ALS

2015-04-02 Thread Justin Yip
Thanks Xiangrui,

I used 80 iterations to demonstrates the marginal diminishing return in
prediction quality :)

Justin
On Apr 2, 2015 00:16, Xiangrui Meng men...@gmail.com wrote:

 I think before 1.3 you also get stackoverflow problem in  ~35
 iterations. In 1.3.x, please use setCheckpointInterval to solve this
 problem, which is available in the current master and 1.3.1 (to be
 released soon). Btw, do you find 80 iterations are needed for
 convergence? -Xiangrui

 On Wed, Apr 1, 2015 at 11:54 PM, Justin Yip yipjus...@prediction.io
 wrote:
  Hello,
 
  I have been using Mllib's ALS in 1.2 and it works quite well. I have just
  upgraded to 1.3 and I encountered stackoverflow problem.
 
  After some digging, I realized that when the iteration  ~35, I will get
  overflow problem. However, I can get at least 80 iterations with ALS in
 1.2.
 
  Is there any change to the ALS algorithm? And are there any ways to
 achieve
  more iterations?
 
  Thanks.
 
  Justin

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




StackOverflow Problem with 1.3 mllib ALS

2015-04-02 Thread Justin Yip
Hello,

I have been using Mllib's ALS in 1.2 and it works quite well. I have just
upgraded to 1.3 and I encountered stackoverflow problem.

After some digging, I realized that when the iteration  ~35, I will get
overflow problem. However, I can get at least 80 iterations with ALS in 1.2.

Is there any change to the ALS algorithm? And are there any ways to achieve
more iterations?

Thanks.

Justin


Catching InvalidClassException in sc.objectFile

2015-03-19 Thread Justin Yip
Hello,

I have persisted a RDD[T] to disk through saveAsObjectFile. Then I
changed the implementation of T. When I read the file with sc.objectFile
using the new binary, I got the exception of java.io.InvalidClassException,
which is expected.

I try to catch this error via SparkException in the driver program.
However, both getCause() and getSuppressed() are empty.

What is the recommended way of catching this exception?

Thanks.

Justin


Did DataFrames break basic SQLContext?

2015-03-18 Thread Justin Pihony
I started to play with 1.3.0 and found that there are a lot of breaking
changes. Previously, I could do the following:

case class Foo(x: Int)
val rdd = sc.parallelize(List(Foo(1)))
import sqlContext._
rdd.registerTempTable(foo)

Now, I am not able to directly use my RDD object and have it implicitly
become a DataFrame. It can be used as a DataFrameHolder, of which I could
write:

rdd.toDF.registerTempTable(foo)

But, that is kind of a pain in comparison. The other problem for me is that
I keep getting a SQLException:

java.sql.SQLException: Failed to start database 'metastore_db' with
class loader  sun.misc.Launcher$AppClassLoader@10393e97, see the next
exception for details.

This seems to be a dependency on Hive, when previously (1.2.0) there was no
such dependency. I can open tickets for these, but wanted to ask here
firstmaybe I am doing something wrong?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Did DataFrames break basic SQLContext?

2015-03-18 Thread Justin Pihony
It appears that the metastore_db problem is related to
https://issues.apache.org/jira/browse/SPARK-4758. I had another shell open
that was stuck. This is probably a bug, though?

import sqlContext.implicits
case class Foo(x: Int)
val rdd = sc.parallelize(List(Foo(1)))
rdd.toDF

results in a frozen shell after this line:

INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on
mysql: Lexical error at line 1, column 5.  Encountered: @ (64), after :
.

which, locks the internally created metastore_db


On Wed, Mar 18, 2015 at 11:20 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 I started to play with 1.3.0 and found that there are a lot of breaking
 changes. Previously, I could do the following:

 case class Foo(x: Int)
 val rdd = sc.parallelize(List(Foo(1)))
 import sqlContext._
 rdd.registerTempTable(foo)

 Now, I am not able to directly use my RDD object and have it implicitly
 become a DataFrame. It can be used as a DataFrameHolder, of which I could
 write:

 rdd.toDF.registerTempTable(foo)

 But, that is kind of a pain in comparison. The other problem for me is that
 I keep getting a SQLException:

 java.sql.SQLException: Failed to start database 'metastore_db' with
 class loader  sun.misc.Launcher$AppClassLoader@10393e97, see the next
 exception for details.

 This seems to be a dependency on Hive, when previously (1.2.0) there was no
 such dependency. I can open tickets for these, but wanted to ask here
 firstmaybe I am doing something wrong?

 Thanks,
 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Bug in Streaming files?

2015-03-14 Thread Justin Pihony
All,
Looking into  this StackOverflow question
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469  
it appears that there is a bug when utilizing the newFilesOnly parameter in
FileInputDStream. Before creating a ticket, I wanted to verify it here. The
gist is that this code is wrong:

val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold,   // initial threshold based on
newFilesOnly setting
currentTime - durationToRemember.milliseconds  // trailing end of
the remember window
  )

The problem is that if you set newFilesOnly to false, then the
initialModTimeIgnoreThreshold is always 0. This makes it always dropped out
of the max operation. So, the best you get is files that were put in the
directory (duration) from the start. 

Is this a bug or expected behavior; it seems like a bug to me.

If I am correct, this appears to be a bigger fix than just using min as it
would break other functionality.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Streaming-files-tp22051.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL JSON array support

2015-03-05 Thread Justin Pihony
Is there any plans of supporting JSON arrays more fully? Take for example:

val myJson =
sqlContext.jsonRDD(List({foo:[{bar:1},{baz:2}]}))
myJson.registerTempTable(JsonTest)

I would like a way to pull out parts of the array data based on a key

sql(SELECT foo[bar] FROM JsonTest) //projects only the object
with bar, the rest would be null
 
I could even work around this if there was some way to access the key name
from the SchemaRDD:

myJson.filter(x=x(0).asInstanceOf[Seq[Row]].exists(y=y.key == bar))
.map(x=x(0).asInstanceOf[Seq[Row]].filter(y=y.key == bar)) 
//This does the same as above, except also filtering out those without a
bar key

This is the closest suggestion I could find thus far,
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView 
which still does not solve the problem of pulling out the keys.

I tried with a UDF also, but could not currently make that work either.

If there isn't anything in the works, then would it be appropriate to create
a ticket for this?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-JSON-array-support-tp21939.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
Thanks!

On Wed, Mar 4, 2015 at 3:58 PM, Michael Armbrust mich...@databricks.com
wrote:

 It is somewhat out of data, but here is what we have so far:
 https://github.com/marmbrus/sql-typed

 On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I am pretty sure that I saw a presentation where SparkSQL could be
 executed
 with static analysis, however I cannot find the presentation now, nor can
 I
 find any documentation or research papers on the topic. So, I am curious
 if
 there is indeed any work going on for this topic. The two things I would
 be
 interested in would be to be able to gain compile time safety, as well as
 gain the ability to work on my data as a type instead of a row (ie,
 result.map(x=x.Age) instead of having to use Row.get)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
I am pretty sure that I saw a presentation where SparkSQL could be executed
with static analysis, however I cannot find the presentation now, nor can I
find any documentation or research papers on the topic. So, I am curious if
there is indeed any work going on for this topic. The two things I would be
interested in would be to be able to gain compile time safety, as well as
gain the ability to work on my data as a type instead of a row (ie,
result.map(x=x.Age) instead of having to use Row.get)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SQLContext.applySchema strictness

2015-02-13 Thread Justin Pihony
Per the documentation:

  It is important to make sure that the structure of every Row of the
provided RDD matches the provided schema. Otherwise, there will be runtime
exception.

However, it appears that this is not being enforced. 

import org.apache.spark.sql._
val sqlContext = new SqlContext(sc)
val struct = StructType(List(StructField(test, BooleanType, true)))
val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff)))
val schemaData = sqlContext.applySchema(myData, struct) //No error
schemaData.collect()(0).getBoolean(0) //Only now will I receive an error

Is this expected or a bug?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQLContext.applySchema strictness

2015-02-13 Thread Justin Pihony
OK, but what about on an action, like collect()? Shouldn't it be able to
determine the correctness at that time?

On Fri, Feb 13, 2015 at 4:49 PM, Yin Huai yh...@databricks.com wrote:

 Hi Justin,

 It is expected. We do not check if the provided schema matches rows since
 all rows need to be scanned to give a correct answer.

 Thanks,

 Yin

 On Fri, Feb 13, 2015 at 1:33 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 Per the documentation:

   It is important to make sure that the structure of every Row of the
 provided RDD matches the provided schema. Otherwise, there will be runtime
 exception.

 However, it appears that this is not being enforced.

 import org.apache.spark.sql._
 val sqlContext = new SqlContext(sc)
 val struct = StructType(List(StructField(test, BooleanType, true)))
 val myData = sc.parallelize(List(Row(0), Row(true), Row(stuff)))
 val schemaData = sqlContext.applySchema(myData, struct) //No error
 schemaData.collect()(0).getBoolean(0) //Only now will I receive an error

 Is this expected or a bug?

 Thanks,
 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Aggregate order semantics when spilling

2015-01-20 Thread Justin Uang
Hi,

I am trying to aggregate a key based on some timestamp, and I believe that
spilling to disk is changing the order of the data fed into the combiner.

I have some timeseries data that is of the form: (key, date, other
data)

Partition 1
(A, 2, ...)
(B, 4, ...)
(A, 1, ...)
(A, 3, ...)
(B, 6, ...)

which I then partition by key, then sort within the partition:

Partition 1
(A, 1, ...)
(A, 2, ...)
(A, 3, ...)
(A, 4, ...)

Partition 2
(B, 4, ...)
(B, 6, ...)

If I run a combineByKey with the same partitioner, then the items for each
key will be fed into the ExternalAppendOnlyMap in the correct order.
However, if I spill, then the time slices are spilled to disk as multiple
partial combiners. When its time to merge the spilled combiners for each
key, the combiners are combined in the wrong order.

For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and
[(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that
the combiners can be combined in the wrong order, like [(A, 3, ...),
(A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the
invariant that all the values for A are passed in order to the combiners.

I'm not an expert, but I suspect that this is because we use a heap ordered
by key when iterating, which doesn't retain the order the spilled
combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
where spill_index is incremented each time we spill? This would mean that
we would pop and merge the combiners of each key in order, resulting in
[(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)].

Thanks in advance for the help! If there is a way to do this already in
Spark 1.2, can someone point it out to me?

Best,

Justin


Re: Aggregate order semantics when spilling

2015-01-20 Thread Justin Uang
Hi Andrew,

Thanks for your response! For our use case, we aren't actually grouping,
but rather updating running aggregates. I just picked grouping because it
made the example easier to write out. However, when we merge combiners, the
combiners have to have data that are adjacent to each other in the original
partition.

I feel that requiring groupByKey/cogroup to insert values into the correct
place is quite expensive, and may not be possible for combiners that are
trying to collapse down the data while assuming order. Would it be really
expensive or perilous to the API if we just had combineByKey merge
combiners in the same order as the data slices they represent? I have a
very simple prototype that adds this additional semantic (it's only a 16
line diff):

https://github.com/justinuang/spark/commit/b92ee6a6dbf70207eca68296289cb62c3cea76b8

It looks like the additional comparison is trivial to runtime, and this
doesn't break any backcompat.

Thanks,

Justin

On Tue, Jan 20, 2015 at 8:03 PM, Andrew Or and...@databricks.com wrote:

 Hi Justin,

 I believe the intended semantics of groupByKey or cogroup is that the
 ordering *within a key *is not preserved if you spill. In fact, the test
 cases for the ExternalAppendOnlyMap only assert that the Set representation
 of the results is as expected (see this line
 https://github.com/apache/spark/blob/9d9294aebf7208a76f43d8fc5a0489a83d7215f4/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala#L265).
 This is because these Spark primitives literally just group the values by a
 key but does not provide any ordering guarantees.

 However, if ordering within a key is a requirement for your application,
 then you may need to write your own PairRDDFunction that calls
 combineByKey. You can model your method after groupByKey, but change the
 combiner function slightly to take ordering into account. This may add some
 overhead to your application since you need to insert every value in the
 appropriate place, but since you're spilling anyway the overhead will
 likely be shadowed by disk I/O.

 Let me know if that works.
 -Andrew


 2015-01-20 9:18 GMT-08:00 Justin Uang justin.u...@gmail.com:

 Hi,

 I am trying to aggregate a key based on some timestamp, and I believe
 that spilling to disk is changing the order of the data fed into the
 combiner.

 I have some timeseries data that is of the form: (key, date, other
 data)

 Partition 1
 (A, 2, ...)
 (B, 4, ...)
 (A, 1, ...)
 (A, 3, ...)
 (B, 6, ...)

 which I then partition by key, then sort within the partition:

 Partition 1
 (A, 1, ...)
 (A, 2, ...)
 (A, 3, ...)
 (A, 4, ...)

 Partition 2
 (B, 4, ...)
 (B, 6, ...)

 If I run a combineByKey with the same partitioner, then the items for
 each key will be fed into the ExternalAppendOnlyMap in the correct order.
 However, if I spill, then the time slices are spilled to disk as multiple
 partial combiners. When its time to merge the spilled combiners for each
 key, the combiners are combined in the wrong order.

 For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and
 [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that
 the combiners can be combined in the wrong order, like [(A, 3, ...),
 (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the
 invariant that all the values for A are passed in order to the combiners.

 I'm not an expert, but I suspect that this is because we use a heap
 ordered by key when iterating, which doesn't retain the order the spilled
 combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
 where spill_index is incremented each time we spill? This would mean that
 we would pop and merge the combiners of each key in order, resulting in
 [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)].

 Thanks in advance for the help! If there is a way to do this already in
 Spark 1.2, can someone point it out to me?

 Best,

 Justin





Accumulator value in Spark UI

2015-01-14 Thread Justin Yip
Hello,

From accumulator documentation, it says that if the accumulator is named,
it will be displayed in the WebUI. However, I cannot find it anywhere.

Do I need to specify anything in the spark ui config?

Thanks.

Justin


Re: Accumulator value in Spark UI

2015-01-14 Thread Justin Yip
Found it. Thanks Patrick.

Justin

On Wed, Jan 14, 2015 at 10:38 PM, Patrick Wendell pwend...@gmail.com
wrote:

 It should appear in the page for any stage in which accumulators are
 updated.

 On Wed, Jan 14, 2015 at 6:46 PM, Justin Yip yipjus...@prediction.io
 wrote:
  Hello,
 
  From accumulator documentation, it says that if the accumulator is
 named, it
  will be displayed in the WebUI. However, I cannot find it anywhere.
 
  Do I need to specify anything in the spark ui config?
 
  Thanks.
 
  Justin



Re: How to create an empty RDD with a given type?

2015-01-12 Thread Justin Yip
Xuelin,

There is a function called emtpyRDD under spark context
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
which
serves this purpose.

Justin

On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao xuelincao2...@gmail.com wrote:



 Hi,

 I'd like to create a transform function, that convert RDD[String] to
 RDD[Int]

 Occasionally, the input RDD could be an empty RDD. I just want to
 directly create an empty RDD[Int] if the input RDD is empty. And, I don't
 want to return None as the result.

 Is there an easy way to do that?





IOException: exception in uploadSinglePart

2014-11-17 Thread Justin Mills
Spark 1.1.0, running on AWS EMR cluster using yarn-client as master.

I'm getting the following error when I attempt to save a RDD to S3. I've
narrowed it down to a single partition that is ~150Mb in size (versus the
other partitions that are closer to 1 Mb). I am able to work around this by
saving to a HDFS file first, then using the hadoop distcp command to get
the data upon S3, but it seems that Spark should be able to handle that.
I've even tried writing to HDFS, then creating something like the following
to get that up on S3:

sc.textFile(hdfsPath).saveAsTextFile(s3Path)

Here's the exception:

java.io.IOException: exception in uploadSinglePart

com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:164)

com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:220)

org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:105)

org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106)
java.io.FilterOutputStream.close(FilterOutputStream.java:160)

org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:109)

org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:101)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:994)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:979)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Any ideas of what the underlying issue could be here? It feels like a
timeout issue, but that's just a guess.

Thanks, Justin


MLLib sample data format

2014-06-22 Thread Justin Yip
Hello,

I am looking into a couple of MLLib data files in
https://github.com/apache/spark/tree/master/data/mllib. But I cannot find
any explanation for these files? Does anyone know if they are documented?

Thanks.

Justin


Re: MLLib sample data format

2014-06-22 Thread Justin Yip
Hi Shuo,

Yes. I was reading the guide as well as the sample code.

For example, in
http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm,
nowhere in the github repository I can find the file: sc.textFile(
mllib/data/ridge-data/lpsa.data).

Thanks.

Justin


On Sun, Jun 22, 2014 at 3:24 PM, Justin Yip yipjus...@gmail.com wrote:

 Hi Shuo,

 Yes. I was reading the guide as well as the sample code.

 For example, in
 http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm,
 now where in the github repository I can find the file: sc.textFile(
 mllib/data/ridge-data/lpsa.data).

 Thanks.

 Justin



 On Sun, Jun 22, 2014 at 2:40 PM, Shuo Xiang shuoxiang...@gmail.com
 wrote:

 Hi, you might find http://spark.apache.org/docs/latest/mllib-guide.html
 helpful.


 On Sun, Jun 22, 2014 at 2:35 PM, Justin Yip yipjus...@gmail.com wrote:

 Hello,

 I am looking into a couple of MLLib data files in
 https://github.com/apache/spark/tree/master/data/mllib. But I cannot
 find any explanation for these files? Does anyone know if they are
 documented?

 Thanks.

 Justin






Re: MLLib sample data format

2014-06-22 Thread Justin Yip
Hi Shuo,

Yes. I was reading the guide as well as the sample code.

For example, in
http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm,
now where in the github repository I can find the file: sc.textFile(
mllib/data/ridge-data/lpsa.data).

Thanks.

Justin



On Sun, Jun 22, 2014 at 2:40 PM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Hi, you might find http://spark.apache.org/docs/latest/mllib-guide.html
 helpful.


 On Sun, Jun 22, 2014 at 2:35 PM, Justin Yip yipjus...@gmail.com wrote:

 Hello,

 I am looking into a couple of MLLib data files in
 https://github.com/apache/spark/tree/master/data/mllib. But I cannot
 find any explanation for these files? Does anyone know if they are
 documented?

 Thanks.

 Justin





Re: MLLib sample data format

2014-06-22 Thread Justin Yip
I see. That's good. Thanks.

Justin


On Sun, Jun 22, 2014 at 4:59 PM, Evan Sparks evan.spa...@gmail.com wrote:

 Oh, and the movie lens one is userid::movieid::rating

 - Evan

 On Jun 22, 2014, at 3:35 PM, Justin Yip yipjus...@gmail.com wrote:

 Hello,

 I am looking into a couple of MLLib data files in
 https://github.com/apache/spark/tree/master/data/mllib. But I cannot find
 any explanation for these files? Does anyone know if they are documented?

 Thanks.

 Justin




MLLib : Decision Tree with minimum points per node

2014-06-13 Thread Justin Yip
Hello,

I have been playing around with mllib's decision tree library. It is
working great, thanks.

I have a question regarding overfitting. It appears to me that the current
implementation doesn't allows user to specify the minimum number of samples
per node. This results in some nodes only contain very few samples, which
potentially leads to overfitting.

I would like to know if there is workaround or any way to prevent
overfitting? Or will decision tree supports min-samples-per-node in future
releases?

Thanks.

Justin


KyroException: Unable to find class

2014-06-05 Thread Justin Yip
Hello,

I have been using Externalizer from Chill to as serialization wrapper. It
appears to me that Spark have some conflict with the classloader with
Chill. I have the (a simplified version) following program:

import java.io._
import com.twitter.chill.Externalizer

class X(val i: Int) { override def toString() = sX(i=$i) }

object SimpleApp {
  def main(args: Array[String]) {
val bos = new ByteArrayOutputStream(10)
val oos = new ObjectOutputStream(bos)
oos.writeObject(Externalizer(new X(10)))
oos.close()

val ois = new ObjectInputStream(new
ByteArrayInputStream(bos.toByteArray))
val y = ois.readObject.asInstanceOf[Externalizer[X]]
println(y.get)
  }
}

When I run it as a normal program (i.e. sbt run), the program runs fine.

But when I run it with spark-submit (i.e. spark-submit --verbose --class
SimpleApp --master local[4]
target/scala-2.10/simple-project_2.10-1.0.jar ), the program fails at
ois.readObject call. I got an error that Kryo fails to find the class X.

Exception in thread main com.esotericsoftware.kryo.KryoException: Unable
to find class: X
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)

I guess the issue is that Spark has a magic classloader, and Kryo fails to
see the same classpaths.

Is there anyway to remedy this issue?

Thanks.

Justin