Unsubscribe

2020-05-05 Thread Zeming Yu
Unsubscribe

Get Outlook for Android



unsubscribe

2020-04-29 Thread Zeming Yu
unsubscribe

Get Outlook for Android



[no subject]

2020-04-28 Thread Zeming Yu
Unsubscribe

Get Outlook for Android



Re: examples for flattening dataframes using pyspark

2017-05-27 Thread Zeming Yu
Sorry, sent the incomplete email by mistake. Here's the full email:


> Hi,
>
> I need to flatten a nested dataframe and I' following this example:
> https://docs.databricks.com/spark/latest/spark-sql/complex-types.html
>
> Just wondering:
> 1. how can I test for the existence of an item before retrieving it
> Say test if "b" exists before adding that into my flat dataframe
>
> events = jsonToDataFrame("""{  "a": {"b": 1,
>
> "c": 2,
>
>   }}""")
>
>
> 2. how can I loop through "b" and "c" and do some aggregation (e.g.
> finding the maximum, minimum)?
>
>
>
> Does anyone know of any examples of how to do these tasks?
>
> Thanks!
> Zeming
>


examples for flattening dataframes using pyspark

2017-05-27 Thread Zeming Yu
Hi,

I need to flatten a nested dataframe and I' following this example:
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

Just wondering:
1. how can I test for the existence of an item before retrieving it
Say test if "b" exists before adding that into my flat dataframe

events = jsonToDataFrame("""{  "a": {"b": 1,

"c": 2,

  }}""")


2.



Does anyone know of any examples of how to do these tasks?

Thanks!
Zeming


using pandas and pyspark to run ETL job - always failing after about 40 minutes

2017-05-26 Thread Zeming Yu
Hi,

I tried running the ETL job a few times. It always fails after 40 minutes
or so. When I relaunch jupyter and rerun the job, it runs without error.
Then it fails again after some time. Just wondering if anyone else has
encountered this before?

Here's the error message:


Exception happened during processing of request from ('127.0.0.1', 40292)

Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.5/socketserver.py", line
313, in _handle_request_noblock
self.process_request(request, client_address)
  File "/home/ubuntu/anaconda3/lib/python3.5/socketserver.py", line
341, in process_request
self.finish_request(request, client_address)
  File "/home/ubuntu/anaconda3/lib/python3.5/socketserver.py", line
354, in finish_request
self.RequestHandlerClass(request, client_address, self)
  File "/home/ubuntu/anaconda3/lib/python3.5/socketserver.py", line
681, in __init__
self.handle()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/accumulators.py",
line 235, in handle
num_updates = read_int(self.rfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/serializers.py",
line 577, in read_int
raise EOFError
EOFError


Re: what does this error mean?

2017-05-13 Thread Zeming Yu
Another error. Anyone have any idea?

this one happens when I tried to convert a spark dataframe to pandas:

---Py4JError
Traceback (most recent call
last)/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py
in collect(self)390 with SCCallSiteSync(self._sc) as
css:--> 391 port = self._jdf.collectToPython()392
   return list(_load_from_socket(port,
BatchedSerializer(PickleSerializer(
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in __call__(self, *args)   1132 return_value =
get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   1134
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py in
deco(*a, **kw) 62 try:---> 63 return f(*a,
**kw) 64 except py4j.protocol.Py4JJavaError as e:
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)326
  "An error occurred while calling {0}{1}{2}".--> 327
   format(target_id, ".", name))328 else:
Py4JError: An error occurred while calling o69.collectToPython

During handling of the above exception, another exception occurred:
IndexErrorTraceback (most recent call
last)/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in _get_connection(self)826 try:--> 827
connection = self.deque.pop()828 except IndexError:
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:
ConnectionRefusedErrorTraceback (most recent call
last)/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in start(self)962 try:--> 963
self.socket.connect((self.address, self.port))964
self.is_connected = True
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:
Py4JNetworkError  Traceback (most recent call
last) in ()  7
   'lead_time', 'dep_weekday', 'dep_weeknum',  8
   'days_to_last_holiday', 'days_to_next_holiday',> 9
'duration_minutes', 'stop_minutes').toPandas()
10 flight_pd.head()
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py
in toPandas(self)   1583 """   1584 import pandas as
pd-> 1585 return pd.DataFrame.from_records(self.collect(),
columns=self.columns)   15861587
##
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py
in collect(self)389 """390 with
SCCallSiteSync(self._sc) as css:--> 391 port =
self._jdf.collectToPython()392 return
list(_load_from_socket(port, BatchedSerializer(PickleSerializer(
 393
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/traceback_utils.py
in __exit__(self, type, value, tb) 76
SCCallSiteSync._spark_stack_depth -= 1 77 if
SCCallSiteSync._spark_stack_depth == 0:---> 78
self._context._jsc.setCallSite(None)
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in __call__(self, *args)   1129 proto.END_COMMAND_PART
1130 -> 1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(   1133 answer, self.gateway_client,
self.target_id, self.name)
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in send_command(self, command, retry, binary)879  if
`binary` is `True`.880 """--> 881 connection =
self._get_connection()882 try:883 response
= connection.send_command(command)
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in _get_connection(self)827 connection =
self.deque.pop()828 except IndexError:--> 829
connection = self._create_connection()830 return
connection831
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in _create_connection(self)833 connection =
GatewayConnection(834 self.gateway_parameters,
self.gateway_property)--> 835 connection.start()836
 return connection837
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in start(self)968 "server
({0}:{1})".format(self.address, self.port)969
logger.exception(msg)--> 970 raise Py4JNetworkError(msg,
e)971 972 def close(self, reset=False):
Py4JNetworkError: An error occurred while trying to connect to the
Java 

what does this error mean?

2017-05-13 Thread Zeming Yu
My code runs error free on my local pc. Just tried running the same code on
a ubuntu machine on ec2, and got the error below. Any idea where to start
in terms of debugging?

---Py4JError
Traceback (most recent call
last) in ()> 1
output.show(2)
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py
in show(self, n, truncate)316 """317 if
isinstance(truncate, bool) and truncate:--> 318
print(self._jdf.showString(n, 20))319 else:320
print(self._jdf.showString(n, int(truncate)))
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py
in __call__(self, *args)   1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   11341135 for temp_arg in
temp_args:
/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py in
deco(*a, **kw) 61 def deco(*a, **kw): 62 try:--->
63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
/home/ubuntu/anaconda3/lib/python3.5/site-packages/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)325
  raise Py4JError(326 "An error occurred while
calling {0}{1}{2}".--> 327 format(target_id, ".",
name))328 else:329 type = answer[1]
Py4JError: An error occurred while calling o648.showString


how to set up h2o sparkling water on jupyter notebook on a windows machine

2017-05-08 Thread Zeming Yu
Hi,

I'm a newbie, so please bear with me.

*I'm using a windows 10 machine. I installed spark here:*
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7

*I also installed h2o sparkling water here:*
C:\sparkling-water-2.1.1

*I use this code in command line to launch a jupyter notebook for pyspark:*
cd C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7
bin\pyspark --executor-cores 2

*I then run this code inside the jupyter notebook to start spark:*
spark_home = "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7"

if not spark_home:

raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))

sys.path.insert(0, os.path.join(spark_home,
'C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip'))

exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())


*My question is, how can I get h2o running in the same jupyter notebook?*
I spent hours trying to google for the answer but couldn't figure out how
to do it...

Many thanks in advance!
Zeming


how to check whether spill over to hard drive happened or not

2017-05-06 Thread Zeming Yu
hi,

I'm running pyspark on my local PC using the stand alone mode.

After a pyspark window function on a dataframe, I did a groupby query on
the dataframe.
The groupby query turns out to be very slow (10+ minutes on a small data
set).
I then cached the dataframe and re-ran the same query. The query remained
very slow.

I could also hear noises from the hard drive - I assume the PC is busy
reading and writing from the hard drive. Is this an indication of the data
frame has spilled over to hard drive?

What's the best method for monitoring what's happening? How can I avoid
this from happening?

Thanks!


Re: take the difference between two columns of a dataframe in pyspark

2017-05-06 Thread Zeming Yu
OK. I've worked it out.

df.withColumn('diff', col('A')-col('B'))

On Sun, May 7, 2017 at 11:49 AM, Zeming Yu <zemin...@gmail.com> wrote:

> Say I have the following dataframe with two numeric columns A and B,
> what's the best way to add a column showing the difference between the two
> columns?
>
> +-+--+
> |A| B|
> +-+--+
> |786.31999|786.12|
> |   786.12|786.12|
> |   786.42|786.12|
> |   786.72|786.12|
> |   786.92|786.12|
> |   786.92|786.12|
> |   786.72|786.12|
> |   786.72|786.12|
> |   827.72|786.02|
> |   827.72|786.02|
> +-+--+
>
>
> I could probably figure out how to do this vis UDF, but is UDF generally 
> slower?
>
>
> Thanks!
>
>


take the difference between two columns of a dataframe in pyspark

2017-05-06 Thread Zeming Yu
Say I have the following dataframe with two numeric columns A and B, what's
the best way to add a column showing the difference between the two columns?

+-+--+
|A| B|
+-+--+
|786.31999|786.12|
|   786.12|786.12|
|   786.42|786.12|
|   786.72|786.12|
|   786.92|786.12|
|   786.92|786.12|
|   786.72|786.12|
|   786.72|786.12|
|   827.72|786.02|
|   827.72|786.02|
+-+--+


I could probably figure out how to do this vis UDF, but is UDF generally slower?


Thanks!


Spark books

2017-05-03 Thread Zeming Yu
I'm trying to decide whether to buy the book learning spark, spark for
machine learning etc. or wait for a new edition covering the new concepts
like dataframe and datasets. Anyone got any suggestions?


Re: parquet optimal file structure - flat vs nested

2017-04-30 Thread Zeming Yu
I thought relational databases with 6 TB of data can be quite expensive?

On 1 May 2017 12:56 am, "Muthu Jayakumar" <bablo...@gmail.com> wrote:

> I am not sure if parquet is a good fit for this? This seems more like
> filter lookup than an aggregate like query. I am curious to see what others
> have to say.
> Would it be more efficient if a relational database with the right index
> (code field in the above case) to perform more efficiently (with spark that
> uses predicate push-down)?
> Hope this helps.
>
> Thanks,
> Muthu
>
> On Sun, Apr 30, 2017 at 1:45 AM, Zeming Yu <zemin...@gmail.com> wrote:
>
>> Another question: I need to store airport info in a parquet file and
>> present it when a user makes a query.
>>
>> For example:
>>
>> "airport": {
>> "code": "TPE",
>> "name": "Taipei (Taoyuan Intl.)",
>> "longName": "Taipei, Taiwan
>> (TPE-Taoyuan Intl.)",
>> "city": "Taipei",
>> "localName": "Taoyuan Intl.",
>> "airportCityState": "Taipei,
>> Taiwan"
>>
>>
>> Is it best practice to store just the coce "TPE" and then look up the
>> name "Taipei (Taoyuan Intl.)" from a relational database? Any alternatives?
>>
>> On Sun, Apr 30, 2017 at 6:34 PM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Depends on your queries, the data structure etc. generally flat is
>>> better, but if your query filter is on the highest level then you may have
>>> better performance with a nested structure, but it really depends
>>>
>>> > On 30. Apr 2017, at 10:19, Zeming Yu <zemin...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> > We're building a parquet based data lake. I was under the impression
>>> that flat files are more efficient than deeply nested files (say 3 or 4
>>> levels down). Is that correct?
>>> >
>>> > Thanks,
>>> > Zeming
>>>
>>
>>
>


examples of dealing with nested parquet/ dataframe file

2017-04-30 Thread Zeming Yu
Hi,

I'm still trying to decide whether to store my data as deeply nested or
flat parquet file.

The main reason for storing the nested file is it stores data in its raw
format, no information loss.

I have two questions:

1. Is it always necessary to flatten a nested dataframe for the purpose of
building a machine learning model? (I don't want to use the explode
function as there's only one response per row)

2. Could anyone point me to a few examples of dealing with deeply nested
(say 5 levels deep) dataframes in pyspark?


Re: Recommended cluster parameters

2017-04-30 Thread Zeming Yu
I've got a similar question. Would you be able to provide some rough guide
(even a range is fine) on the number of nodes, cores, and total amount of
RAM required?

Do you want to store 1 TB, 1 PB or far more?

- say 6 TB of data in parquet format on s3


Do you want to just read that data, retrieve it then do little work on it
and then read it, have a complex machine learning pipeline?

- I need to 1) read it and do complex machine learning 2) query the last 3
months of data, visualise it, and come back with answers with seconds of
latency




On Sun, Apr 30, 2017 at 6:57 PM, yohann jardin 
wrote:

> It really depends on your needs and your data.
>
>
> Do you want to store 1 TB, 1 PB or far more? Do you want to just read that
> data, retrieve it then do little work on it and then read it, have a
> complex machine learning pipeline? Depending on the workload, the ratio
> between cores and storage will vary.
>
>
> First start with a subset of your data and do some tests on your own
> computer or (that’s better) with a little cluster of 3 nodes. This will
> help you to find your ratio between storage/cores and the needs of memory
> that you might expect if you are not using just a subset of your data but
> the whole bunch available that you (can) have.
>
>
> Then using this information and indications on Spark website (
> http://spark.apache.org/docs/latest/hardware-provisioning.html), you will
> be able to specify the hardware of one node, and how many nodes you need
> (at least 3).
>
>
> *Yohann Jardin*
> Le 4/30/2017 à 10:26 AM, rakesh sharma a écrit :
>
> Hi
>
> I would like to know the details of implementing a cluster.
>
> What kind of machines one would require, how many nodes, number of cores
> etc.
>
>
> thanks
>
> rakesh
>
>
>


Re: parquet optimal file structure - flat vs nested

2017-04-30 Thread Zeming Yu
Another question: I need to store airport info in a parquet file and
present it when a user makes a query.

For example:

"airport": {
"code": "TPE",
"name": "Taipei (Taoyuan Intl.)",
"longName": "Taipei, Taiwan
(TPE-Taoyuan Intl.)",
"city": "Taipei",
"localName": "Taoyuan Intl.",
"airportCityState": "Taipei, Taiwan"


Is it best practice to store just the coce "TPE" and then look up the name
"Taipei (Taoyuan Intl.)" from a relational database? Any alternatives?

On Sun, Apr 30, 2017 at 6:34 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Depends on your queries, the data structure etc. generally flat is better,
> but if your query filter is on the highest level then you may have better
> performance with a nested structure, but it really depends
>
> > On 30. Apr 2017, at 10:19, Zeming Yu <zemin...@gmail.com> wrote:
> >
> > Hi,
> >
> > We're building a parquet based data lake. I was under the impression
> that flat files are more efficient than deeply nested files (say 3 or 4
> levels down). Is that correct?
> >
> > Thanks,
> > Zeming
>


parquet optimal file structure - flat vs nested

2017-04-30 Thread Zeming Yu
Hi,

We're building a parquet based data lake. I was under the impression that
flat files are more efficient than deeply nested files (say 3 or 4 levels
down). Is that correct?

Thanks,
Zeming


Re: how to find the nearest holiday

2017-04-25 Thread Zeming Yu
Still not working. Seems like there's some syntax error.

from pyspark.sql.functions import udf
start_date_test2.withColumn("diff", datediff(start_date_test2.start_date,
start_date_test2.holiday.getItem[0])).show()

---TypeError
Traceback (most recent call
last) in () 26  27 from
pyspark.sql.functions import udf---> 28
start_date_test2.withColumn("diff",
datediff(start_date_test2.start_date,
start_date_test2.holiday.getItem[0])).show()
TypeError: 'method' object is not subscriptable



On Tue, Apr 25, 2017 at 10:59 PM, Pushkar.Gujar <pushkarvgu...@gmail.com>
wrote:

>
> ​You can use
> ​-
> start_date_test2.holiday.getItem[0]
>
> ​I would highly suggest you to look at latest documentation -
> http://spark.apache.org/docs/latest/api/python/index.html ​
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Tue, Apr 25, 2017 at 8:50 AM, Zeming Yu <zemin...@gmail.com> wrote:
>
>> How could I access the first element of the holiday column?
>>
>> I tried the following code, but it doesn't work:
>> start_date_test2.withColumn("diff", datediff(start_date_test2.start_date,
>>
>> ​​
>> start_date_test2.holiday*[0]*)).show()
>>
>> On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu <zemin...@gmail.com> wrote:
>>
>>> Got it working now!
>>>
>>> Does anyone have a pyspark example of how to calculate the numbers of
>>> days from the nearest holiday based on an array column?
>>>
>>> I.e. from this table
>>>
>>> +--+---+
>>> |start_date|holiday|
>>> +--+---+
>>> |2017-08-11|[2017-05-30,2017-10-01]|
>>>
>>>
>>> calculate a column called "days_from_nearest_holiday" which calculates the 
>>> difference between 11 aug 2017 and 1 oct 2017?
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu <yuw...@cn.ibm.com> wrote:
>>>
>>>> TypeError: unorderable types: str() >= datetime.date()
>>>>
>>>> Should transfer string to Date type when compare.
>>>>
>>>> Yu Wenpei.
>>>>
>>>>
>>>> - Original message -
>>>> From: Zeming Yu <zemin...@gmail.com>
>>>> To: user <user@spark.apache.org>
>>>> Cc:
>>>> Subject: how to find the nearest holiday
>>>> Date: Tue, Apr 25, 2017 3:39 PM
>>>>
>>>> I have a column of dates (date type), just trying to find the nearest
>>>> holiday of the date. Anyone has any idea what went wrong below?
>>>>
>>>>
>>>>
>>>> start_date_test = flight3.select("start_date").distinct()
>>>> start_date_test.show()
>>>>
>>>> holidays = ['2017-09-01', '2017-10-01']
>>>>
>>>> +--+
>>>> |start_date|
>>>> +--+
>>>> |2017-08-11|
>>>> |2017-09-11|
>>>> |2017-09-28|
>>>> |2017-06-29|
>>>> |2017-09-29|
>>>> |2017-07-31|
>>>> |2017-08-14|
>>>> |2017-08-18|
>>>> |2017-04-09|
>>>> |2017-09-21|
>>>> |2017-08-10|
>>>> |2017-06-30|
>>>> |2017-08-19|
>>>> |2017-07-06|
>>>> |2017-06-28|
>>>> |2017-09-14|
>>>> |2017-08-08|
>>>> |2017-08-22|
>>>> |2017-07-03|
>>>> |2017-07-30|
>>>> +--+
>>>> only showing top 20 rows
>>>>
>>>>
>>>>
>>>> index = spark.sparkContext.broadcast(sorted(holidays))
>>>>
>>>> def nearest_holiday(date):
>>>> last_holiday = index.value[0]
>>>> for next_holiday in index.value:
>>>> if next_holiday >= date:
>>>> break
>>>> last_holiday = next_holiday
>>>> if last_holiday > date:
>>>> last_holiday = None
>>>> if next_holiday < date:
>>>> next_holiday = None
>>>> return (last_holiday, next_holiday)
>>>>
>>>>
>>>> from pyspark.sql.types import *
>>>> return_type = StructType([StructField('last_holiday', StringType()),
>>>> StructField('next_holiday', StringType())])
>>>>
>>>> from pyspark.sql.functions import udf

Re: how to find the nearest holiday

2017-04-25 Thread Zeming Yu
How could I access the first element of the holiday column?

I tried the following code, but it doesn't work:
start_date_test2.withColumn("diff", datediff(start_date_test2.start_date,
start_date_test2.holiday*[0]*)).show()

On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu <zemin...@gmail.com> wrote:

> Got it working now!
>
> Does anyone have a pyspark example of how to calculate the numbers of days
> from the nearest holiday based on an array column?
>
> I.e. from this table
>
> +--+---+
> |start_date|holiday|
> +--+---+
> |2017-08-11|[2017-05-30,2017-10-01]|
>
>
> calculate a column called "days_from_nearest_holiday" which calculates the 
> difference between 11 aug 2017 and 1 oct 2017?
>
>
>
>
>
> On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu <yuw...@cn.ibm.com> wrote:
>
>> TypeError: unorderable types: str() >= datetime.date()
>>
>> Should transfer string to Date type when compare.
>>
>> Yu Wenpei.
>>
>>
>> - Original message -
>> From: Zeming Yu <zemin...@gmail.com>
>> To: user <user@spark.apache.org>
>> Cc:
>> Subject: how to find the nearest holiday
>> Date: Tue, Apr 25, 2017 3:39 PM
>>
>> I have a column of dates (date type), just trying to find the nearest
>> holiday of the date. Anyone has any idea what went wrong below?
>>
>>
>>
>> start_date_test = flight3.select("start_date").distinct()
>> start_date_test.show()
>>
>> holidays = ['2017-09-01', '2017-10-01']
>>
>> +--+
>> |start_date|
>> +--+
>> |2017-08-11|
>> |2017-09-11|
>> |2017-09-28|
>> |2017-06-29|
>> |2017-09-29|
>> |2017-07-31|
>> |2017-08-14|
>> |2017-08-18|
>> |2017-04-09|
>> |2017-09-21|
>> |2017-08-10|
>> |2017-06-30|
>> |2017-08-19|
>> |2017-07-06|
>> |2017-06-28|
>> |2017-09-14|
>> |2017-08-08|
>> |2017-08-22|
>> |2017-07-03|
>> |2017-07-30|
>> +--+
>> only showing top 20 rows
>>
>>
>>
>> index = spark.sparkContext.broadcast(sorted(holidays))
>>
>> def nearest_holiday(date):
>> last_holiday = index.value[0]
>> for next_holiday in index.value:
>> if next_holiday >= date:
>> break
>> last_holiday = next_holiday
>> if last_holiday > date:
>> last_holiday = None
>> if next_holiday < date:
>> next_holiday = None
>> return (last_holiday, next_holiday)
>>
>>
>> from pyspark.sql.types import *
>> return_type = StructType([StructField('last_holiday', StringType()),
>> StructField('next_holiday', StringType())])
>>
>> from pyspark.sql.functions import udf
>> nearest_holiday_udf = udf(nearest_holiday, return_type)
>>
>> start_date_test.withColumn('holiday', 
>> nearest_holiday_udf('start_date')).show(5,
>> False)
>>
>>
>> here's the error I got:
>>
>> 
>> ---
>> Py4JJavaError Traceback (most recent call
>> last)
>>  in ()
>>  24 nearest_holiday_udf = udf(nearest_holiday, return_type)
>>  25
>> ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf(
>> 'start_date')).show(5, False)
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\pyspark\sql\dataframe.py in show(self, n, truncate)
>> 318 print(self._jdf.showString(n, 20))
>> 319 else:
>> --> 320 print(self._jdf.showString(n, int(truncate)))
>> 321
>> 322 def __repr__(self):
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args)
>>1131 answer = self.gateway_client.send_command(command)
>>1132 return_value = get_return_value(
>> -> 1133 answer, self.gateway_client, self.target_id,
>> self.name)
>>1134
>>1135 for temp_arg in temp_args:
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\pyspark\sql\utils.py in deco(*a, **kw)
>>  61 def deco(*a, **kw):
>>  62 try:
>> ---> 63 return f(*a, **kw)
>>  64 except py4j.protocol.Py4JJavaError as e:
>>  65 s = e.java_exception.toString()
>>
>> C:\spark-2

Re: how to find the nearest holiday

2017-04-25 Thread Zeming Yu
Got it working now!

Does anyone have a pyspark example of how to calculate the numbers of days
from the nearest holiday based on an array column?

I.e. from this table

+--+---+
|start_date|holiday|
+--+---+
|2017-08-11|[2017-05-30,2017-10-01]|


calculate a column called "days_from_nearest_holiday" which calculates
the difference between 11 aug 2017 and 1 oct 2017?





On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu <yuw...@cn.ibm.com> wrote:

> TypeError: unorderable types: str() >= datetime.date()
>
> Should transfer string to Date type when compare.
>
> Yu Wenpei.
>
>
> ----- Original message -
> From: Zeming Yu <zemin...@gmail.com>
> To: user <user@spark.apache.org>
> Cc:
> Subject: how to find the nearest holiday
> Date: Tue, Apr 25, 2017 3:39 PM
>
> I have a column of dates (date type), just trying to find the nearest
> holiday of the date. Anyone has any idea what went wrong below?
>
>
>
> start_date_test = flight3.select("start_date").distinct()
> start_date_test.show()
>
> holidays = ['2017-09-01', '2017-10-01']
>
> +--+
> |start_date|
> +--+
> |2017-08-11|
> |2017-09-11|
> |2017-09-28|
> |2017-06-29|
> |2017-09-29|
> |2017-07-31|
> |2017-08-14|
> |2017-08-18|
> |2017-04-09|
> |2017-09-21|
> |2017-08-10|
> |2017-06-30|
> |2017-08-19|
> |2017-07-06|
> |2017-06-28|
> |2017-09-14|
> |2017-08-08|
> |2017-08-22|
> |2017-07-03|
> |2017-07-30|
> +--+
> only showing top 20 rows
>
>
>
> index = spark.sparkContext.broadcast(sorted(holidays))
>
> def nearest_holiday(date):
> last_holiday = index.value[0]
> for next_holiday in index.value:
> if next_holiday >= date:
> break
> last_holiday = next_holiday
> if last_holiday > date:
> last_holiday = None
> if next_holiday < date:
> next_holiday = None
> return (last_holiday, next_holiday)
>
>
> from pyspark.sql.types import *
> return_type = StructType([StructField('last_holiday', StringType()),
> StructField('next_holiday', StringType())])
>
> from pyspark.sql.functions import udf
> nearest_holiday_udf = udf(nearest_holiday, return_type)
>
> start_date_test.withColumn('holiday', 
> nearest_holiday_udf('start_date')).show(5,
> False)
>
>
> here's the error I got:
>
> 
> ---
> Py4JJavaError Traceback (most recent call
> last)
>  in ()
>  24 nearest_holiday_udf = udf(nearest_holiday, return_type)
>  25
> ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf(
> 'start_date')).show(5, False)
>
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\
> python\pyspark\sql\dataframe.py in show(self, n, truncate)
> 318 print(self._jdf.showString(n, 20))
> 319 else:
> --> 320 print(self._jdf.showString(n, int(truncate)))
> 321
> 322 def __repr__(self):
>
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\
> python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self,
> *args)
>1131 answer = self.gateway_client.send_command(command)
>1132 return_value = get_return_value(
> -> 1133 answer, self.gateway_client, self.target_id, self.name
> )
>1134
>1135 for temp_arg in temp_args:
>
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\
> python\pyspark\sql\utils.py in deco(*a, **kw)
>  61 def deco(*a, **kw):
>  62 try:
> ---> 63 return f(*a, **kw)
>  64 except py4j.protocol.Py4JJavaError as e:
>  65 s = e.java_exception.toString()
>
> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\
> python\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer,
> gateway_client, target_id, name)
> 317 raise Py4JJavaError(
> 318 "An error occurred while calling {0}{1}{2}.\n"
> .
> --> 319 format(target_id, ".", name), value)
> 320 else:
> 321 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling o566.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 98.0 (TID 521, localhost, executor driver): 
> org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
>   File "C:\spark-2.1.0-bi

how to find the nearest holiday

2017-04-25 Thread Zeming Yu
I have a column of dates (date type), just trying to find the nearest
holiday of the date. Anyone has any idea what went wrong below?



start_date_test = flight3.select("start_date").distinct()
start_date_test.show()

holidays = ['2017-09-01', '2017-10-01']

+--+
|start_date|
+--+
|2017-08-11|
|2017-09-11|
|2017-09-28|
|2017-06-29|
|2017-09-29|
|2017-07-31|
|2017-08-14|
|2017-08-18|
|2017-04-09|
|2017-09-21|
|2017-08-10|
|2017-06-30|
|2017-08-19|
|2017-07-06|
|2017-06-28|
|2017-09-14|
|2017-08-08|
|2017-08-22|
|2017-07-03|
|2017-07-30|
+--+
only showing top 20 rows




index = spark.sparkContext.broadcast(sorted(holidays))

def nearest_holiday(date):
last_holiday = index.value[0]
for next_holiday in index.value:
if next_holiday >= date:
break
last_holiday = next_holiday
if last_holiday > date:
last_holiday = None
if next_holiday < date:
next_holiday = None
return (last_holiday, next_holiday)


from pyspark.sql.types import *
return_type = StructType([StructField('last_holiday', StringType()),
StructField('next_holiday', StringType())])

from pyspark.sql.functions import udf
nearest_holiday_udf = udf(nearest_holiday, return_type)

start_date_test.withColumn('holiday',
nearest_holiday_udf('start_date')).show(5, False)


here's the error I got:

---Py4JJavaError
Traceback (most recent call
last) in () 24
nearest_holiday_udf = udf(nearest_holiday, return_type) 25 ---> 26
start_date_test.withColumn('holiday',
nearest_holiday_udf('start_date')).show(5, False)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\dataframe.py
in show(self, n, truncate)318
print(self._jdf.showString(n, 20))319 else:--> 320
print(self._jdf.showString(n, int(truncate)))321 322
def __repr__(self):
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
in __call__(self, *args)   1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   11341135 for temp_arg in
temp_args:
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
in deco(*a, **kw) 61 def deco(*a, **kw): 62
try:---> 63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
in get_return_value(answer, gateway_client, target_id, name)317
 raise Py4JJavaError(318 "An error
occurred while calling {0}{1}{2}.\n".--> 319
format(target_id, ".", name), value)320 else:321
  raise Py4JError(
Py4JJavaError: An error occurred while calling o566.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task
0.0 in stage 98.0 (TID 521, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 174, in main
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 169, in process
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 220, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 138, in dump_stream
for obj in iterator:
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 209, in _batched
for item in iterator:
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 92, in 
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 68, in 
  File "", line 10, in nearest_holiday
TypeError: unorderable types: str() >= datetime.date()

at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at 
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at 
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at 

Re: udf that handles null values

2017-04-24 Thread Zeming Yu
Thank you both!

Here's the code that's working now. It's a bit hard to read due to so many
functions. Any idea how I can improve the readability?

from pyspark.sql.functions import trim, when, from_unixtime,
unix_timestamp, minute, hour

duration_test = flight2.select("stop_duration1")
duration_test.show()


duration_test.withColumn('duration_h',
when(duration_test.stop_duration1.isNull(), 999)

.otherwise(hour(unix_timestamp(duration_test.stop_duration1,"HH'h'mm'm'").cast("timestamp".show(20)


+--+
|stop_duration1|
+--+
| 0h50m|
| 3h15m|
| 8h35m|
| 1h30m|
|12h15m|
|11h50m|
|  2h5m|
|10h25m|
| 8h20m|
|  null|
| 2h50m|
| 2h30m|
| 7h45m|
| 1h10m|
| 2h15m|
|  2h0m|
|10h25m|
| 1h40m|
| 1h55m|
| 1h40m|
+--+
only showing top 20 rows

+--+--+
|stop_duration1|duration_h|
+--+--+
| 0h50m| 0|
| 3h15m| 3|
| 8h35m| 8|
| 1h30m| 1|
|12h15m|12|
|11h50m|11|
|  2h5m| 2|
|10h25m|10|
| 8h20m| 8|
|  null|   999|
| 2h50m| 2|
| 2h30m| 2|
| 7h45m| 7|
| 1h10m| 1|
| 2h15m| 2|
|  2h0m| 2|
|10h25m|10|
| 1h40m| 1|
| 1h55m| 1|
| 1h40m| 1|
+--+--+
only showing top 20 rows





On Tue, Apr 25, 2017 at 11:29 AM, Pushkar.Gujar <pushkarvgu...@gmail.com>
wrote:

> Someone had similar issue today at stackoverflow.
>
> http://stackoverflow.com/questions/43595201/python-how-
> to-convert-pyspark-column-to-date-type-if-there-are-null-
> values/43595728#43595728
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Mon, Apr 24, 2017 at 8:22 PM, Zeming Yu <zemin...@gmail.com> wrote:
>
>> hi all,
>>
>> I tried to write a UDF that handles null values:
>>
>> def getMinutes(hString, minString):
>> if (hString != None) & (minString != None): return int(hString) * 60
>> + int(minString[:-1])
>> else: return None
>>
>> flight2 = (flight2.withColumn("duration_minutes",
>> udfGetMinutes("duration_h", "duration_m")))
>>
>>
>> but I got this error:
>>
>>   File "", line 6, in getMinutes
>> TypeError: int() argument must be a string, a bytes-like object or a number, 
>> not 'NoneType'
>>
>>
>> Does anyone know how to do this?
>>
>>
>> Thanks,
>>
>> Zeming
>>
>>
>


one hot encode a column of vector

2017-04-24 Thread Zeming Yu
how do I do one hot encode on a column of array? e.g. ['TG', 'CA']


FYI here's my code for one hot encoding normal categorical columns.
How do I make it work for a column of array?


from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column,
outputCol=column+"_index").fit(flight3) for column in list(set['ColA',
'ColB', 'ColC'])]

pipeline = Pipeline(stages=indexers)
flight4 = pipeline.fit(flight3).transform(flight3)


pyspark vector

2017-04-24 Thread Zeming Yu
Hi all,

Beginner question:

what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])?

https://spark.apache.org/docs/2.1.0/ml-features.html

 id | texts   | vector
|-|---
 0  | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])


udf that handles null values

2017-04-24 Thread Zeming Yu
hi all,

I tried to write a UDF that handles null values:

def getMinutes(hString, minString):
if (hString != None) & (minString != None): return int(hString) * 60 +
int(minString[:-1])
else: return None

flight2 = (flight2.withColumn("duration_minutes",
udfGetMinutes("duration_h", "duration_m")))


but I got this error:

  File "", line 6, in getMinutes
TypeError: int() argument must be a string, a bytes-like object or a
number, not 'NoneType'


Does anyone know how to do this?


Thanks,

Zeming


Re: how to add new column using regular expression within pyspark dataframe

2017-04-22 Thread Zeming Yu
Thanks a lot!

Just another question, how can I extract the minutes as a number?

I can use:
.withColumn('duration_m',split(flight.duration,'h').getItem(1)

to get strings like '10m'

but how do I drop the charater "m" at the end? I can use substr(), but
what's the function to get the length of the string so that I can do
something like substr(1, len(...)-1)?

On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar <pushkarvgu...@gmail.com>
wrote:

> Can be as  simple as -
>
> from pyspark.sql.functions import split
>
> flight.withColumn('hour',split(flight.duration,'h').getItem(0))
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu <zemin...@gmail.com> wrote:
>
>> Any examples?
>>
>> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)" <facai@gmail.com> wrote:
>>
>>> How about using `withColumn` and UDF?
>>>
>>> example:
>>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>>> <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78>
>>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>>
>>>
>>>
>>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <zemin...@gmail.com> wrote:
>>>
>>>> I've got a dataframe with a column looking like this:
>>>>
>>>> display(flight.select("duration").show())
>>>>
>>>> ++
>>>> |duration|
>>>> ++
>>>> |  15h10m|
>>>> |   17h0m|
>>>> |  21h25m|
>>>> |  14h30m|
>>>> |  24h50m|
>>>> |  26h10m|
>>>> |  14h30m|
>>>> |   23h5m|
>>>> |  21h30m|
>>>> |  11h50m|
>>>> |  16h10m|
>>>> |  15h15m|
>>>> |  21h25m|
>>>> |  14h25m|
>>>> |  14h40m|
>>>> |   16h0m|
>>>> |  24h20m|
>>>> |  14h30m|
>>>> |  14h25m|
>>>> |  14h30m|
>>>> ++
>>>> only showing top 20 rows
>>>>
>>>>
>>>>
>>>> I need to extract the hour as a number and store it as an additional
>>>> column within the same dataframe. What's the best way to do that?
>>>>
>>>>
>>>> I tried the following, but it failed:
>>>>
>>>> import re
>>>> def getHours(x):
>>>>   return re.match('([0-9]+(?=h))', x)
>>>> temp = flight.select("duration").rdd.map(lambda
>>>> x:getHours(x[0])).toDF()
>>>> temp.select("duration").show()
>>>>
>>>>
>>>> error message:
>>>>
>>>>
>>>> ---Py4JJavaError
>>>>  Traceback (most recent call 
>>>> last) in ()  2 def getHours(x): 
>>>>  3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>>>> temp.select("duration").show()
>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>>>> age=1)] 56 """---> 57 return 
>>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>>>> RDD.toDF = toDF
>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)
>>>> 518 519 if isinstance(data, RDD):--> 520 rdd, 
>>>> schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
>>>> 521 else:522 rdd, schema = 
>>>> self._createFromLocal(map(prepare, data), schema)
>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>>>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360 
>>>> struct = self._inferSchema(rdd, samplingRatio)361  
>>>>converter = _create_converter(struct)362 rdd = 
>>>> rdd.map(converter)
>>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>>  in _inf

Re: how to add new column using regular expression within pyspark dataframe

2017-04-20 Thread Zeming Yu
Any examples?

On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)" <facai@gmail.com> wrote:

> How about using `withColumn` and UDF?
>
> example:
> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
> <https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78>
> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>
>
>
> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu <zemin...@gmail.com> wrote:
>
>> I've got a dataframe with a column looking like this:
>>
>> display(flight.select("duration").show())
>>
>> ++
>> |duration|
>> ++
>> |  15h10m|
>> |   17h0m|
>> |  21h25m|
>> |  14h30m|
>> |  24h50m|
>> |  26h10m|
>> |  14h30m|
>> |   23h5m|
>> |  21h30m|
>> |  11h50m|
>> |  16h10m|
>> |  15h15m|
>> |  21h25m|
>> |  14h25m|
>> |  14h40m|
>> |   16h0m|
>> |  24h20m|
>> |  14h30m|
>> |  14h25m|
>> |  14h30m|
>> ++
>> only showing top 20 rows
>>
>>
>>
>> I need to extract the hour as a number and store it as an additional
>> column within the same dataframe. What's the best way to do that?
>>
>>
>> I tried the following, but it failed:
>>
>> import re
>> def getHours(x):
>>   return re.match('([0-9]+(?=h))', x)
>> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>> temp.select("duration").show()
>>
>>
>> error message:
>>
>>
>> ---Py4JJavaError
>>  Traceback (most recent call 
>> last) in ()  2 def getHours(x):   
>>3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>> temp.select("duration").show()
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>> age=1)] 56 """---> 57 return 
>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>> RDD.toDF = toDF
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518  
>>519 if isinstance(data, RDD):--> 520 rdd, schema = 
>> self._createFromRDD(data.map(prepare), schema, samplingRatio)521 
>> else:522 rdd, schema = self._createFromLocal(map(prepare, 
>> data), schema)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360   
>>   struct = self._inferSchema(rdd, samplingRatio)361 
>> converter = _create_converter(struct)362 rdd = 
>> rdd.map(converter)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
>> :class:`pyspark.sql.types.StructType`330 """--> 331 
>> first = rdd.first()332 if not first:333 raise 
>> ValueError("The first row in RDD is empty, "
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in first(self)   1359 ValueError: RDD is empty   1360 """-> 
>> 1361 rs = self.take(1)   1362 if rs:   1363 
>> return rs[0]
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in take(self, num)   13411342 p = range(partsScanned, 
>> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
>> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
>> items += res
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 
>> # SparkContext#runJob.964 mappedRDD = 
>> rdd.mapPartitions(partitionFunc)--> 965 port = 
>> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>> 966 return list(_load_from_socket(port, 
>> mappedRDD._jrdd_deserializer))967
>> C:\spark-2.1.0-bin-h

how to add new column using regular expression within pyspark dataframe

2017-04-17 Thread Zeming Yu
I've got a dataframe with a column looking like this:

display(flight.select("duration").show())

++
|duration|
++
|  15h10m|
|   17h0m|
|  21h25m|
|  14h30m|
|  24h50m|
|  26h10m|
|  14h30m|
|   23h5m|
|  21h30m|
|  11h50m|
|  16h10m|
|  15h15m|
|  21h25m|
|  14h25m|
|  14h40m|
|   16h0m|
|  24h20m|
|  14h30m|
|  14h25m|
|  14h30m|
++
only showing top 20 rows



I need to extract the hour as a number and store it as an additional column
within the same dataframe. What's the best way to do that?


I tried the following, but it failed:

import re
def getHours(x):
  return re.match('([0-9]+(?=h))', x)
temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
temp.select("duration").show()


error message:


---Py4JJavaError
Traceback (most recent call
last) in ()  2 def
getHours(x):  3   return re.match('([0-9]+(?=h))', x)> 4 temp
= flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
  5 temp.select("duration").show()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice',
age=1)] 56 """---> 57 return
sparkSession.createDataFrame(self, schema, sampleRatio) 58  59
RDD.toDF = toDF
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in createDataFrame(self, data, schema, samplingRatio, verifySchema)
518 519 if isinstance(data, RDD):--> 520 rdd,
schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
   521 else:522 rdd, schema =
self._createFromLocal(map(prepare, data), schema)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _createFromRDD(self, rdd, schema, samplingRatio)358 """
   359 if schema is None or isinstance(schema, (list,
tuple)):--> 360 struct = self._inferSchema(rdd,
samplingRatio)361 converter =
_create_converter(struct)362 rdd = rdd.map(converter)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _inferSchema(self, rdd, samplingRatio)329 :return:
:class:`pyspark.sql.types.StructType`330 """--> 331
 first = rdd.first()332 if not first:333
raise ValueError("The first row in RDD is empty, "
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in first(self)   1359 ValueError: RDD is empty   1360
"""-> 1361 rs = self.take(1)   1362 if rs:   1363
   return rs[0]
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in take(self, num)   13411342 p = range(partsScanned,
min(partsScanned + numPartsToTry, totalParts))-> 1343 res
= self.context.runJob(self, takeUpToNumLeft, p)   13441345
items += res
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)963
# SparkContext#runJob.964 mappedRDD =
rdd.mapPartitions(partitionFunc)--> 965 port =
self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))967
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
in __call__(self, *args)   1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   11341135 for temp_arg in
temp_args:
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
in deco(*a, **kw) 61 def deco(*a, **kw): 62
try:---> 63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
in get_return_value(answer, gateway_client, target_id, name)317
 raise Py4JJavaError(318 "An error
occurred while calling {0}{1}{2}.\n".--> 319
format(target_id, ".", name), value)320 else:321
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 75.0 failed 1 times, most recent failure: Lost task
0.0 in stage 75.0 (TID 1035, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 174, in main
  File 

Re: optimising storage and ec2 instances

2017-04-11 Thread Zeming Yu
everything works best if your sources are a few tens to hundreds of MB or
more

Are you referring to the size of the zip file or individual unzipped files?

Any issues with storing a 60 mb zipped file containing heaps of text files
inside?

On 11 Apr. 2017 9:09 pm, "Steve Loughran" <ste...@hortonworks.com> wrote:

>
> > On 11 Apr 2017, at 11:07, Zeming Yu <zemin...@gmail.com> wrote:
> >
> > Hi all,
> >
> > I'm a beginner with spark, and I'm wondering if someone could provide
> guidance on the following 2 questions I have.
> >
> > Background: I have a data set growing by 6 TB p.a. I plan to use spark
> to read in all the data, manipulate it and build a predictive model on it
> (say GBM) I plan to store the data in S3, and use EMR to launch spark,
> reading in data from S3.
> >
> > 1. Which option is best for storing the data on S3 for the purpose of
> analysing it in EMR spark?
> > Option A: storing the 6TB file as 173 million individual text files
> > Option B: zipping up the above 173 million text files as 240,000 zip
> files
> > Option C: appending the individual text files, so have 240,000 text
> files p.a.
> > Option D: combining the text files even further
> >
>
> everything works best if your sources are a few tens to hundreds of MB or
> more of your data, work can be partitioned up by file. If you use more
> structured formats (avro compressed with snappy, etc), you can throw > 1
> executor at work inside a file. Structure is handy all round, even if its
> just adding timestamp and provenance columns to each data file.
>
> there's the HAR file format from Hadoop which can merge lots of small
> files into larger ones, allowing work to be scheduled per har file.
> Recommended for HDFS as it hates small files, on S3 you still have limits
> on small files (including throttling of HTTP requests to shards of a
> bucket), but they are less significant.
>
> One thing to be aware is that the s3 clients spark use are very
> inefficient in listing wide directory trees, and Spark not always the best
> at partitioning work because of this. You can accidentally create a very
> inefficient tree structure like datasets/year=2017/month=5/day=10/hour=12/,
> with only one file per hour. Listing and partitioning suffers here, and
> while s3a on Hadoop 2.8 is better here, Spark hasn't yet fully adapted to
> those changes (use of specific API calls). There's also a lot more to be
> done in S3A to handle wildcards in the directory tree much more efficiently
> (HADOOP-13204); needs to address pattens like 
> (datasets/year=201?/month=*/day=10)
> without treewalking and without fetching too much data from wildcards near
> the top of the tree. We need to avoid implementing something which works
> well on *my* layouts, but absolutely dies on other people's. As is usual in
> OSS, help welcome; early testing here as critical as coding, so as to
> ensure things will work with your file structures
>
> -Steve
>
>
> > 2. Any recommendations on the EMR set up to analyse the 6TB of data all
> at once and build a GBM, in terms of
> > 1) The type of EC2 instances I need?
> > 2) The number of such instances I need?
> > 3) Rough estimate of cost?
> >
>
> no opinion there
>
> >
> > Thanks so much,
> > Zeming
> >
>
>


optimising storage and ec2 instances

2017-04-11 Thread Zeming Yu
Hi all,

I'm a beginner with spark, and I'm wondering if someone could provide
guidance on the following 2 questions I have.

Background: I have a data set growing by 6 TB p.a. I plan to use spark to
read in all the data, manipulate it and build a predictive model on it (say
GBM) I plan to store the data in S3, and use EMR to launch spark, reading
in data from S3.

1. Which option is best for storing the data on S3 for the purpose of
analysing it in EMR spark?
Option A: storing the 6TB file as 173 million individual text files
Option B: zipping up the above 173 million text files as 240,000 zip files
Option C: appending the individual text files, so have 240,000 text files
p.a.
Option D: combining the text files even further

2. Any recommendations on the EMR set up to analyse the 6TB of data all at
once and build a GBM, in terms of
1) The type of EC2 instances I need?
2) The number of such instances I need?
3) Rough estimate of cost?


Thanks so much,
Zeming