Re: Long-Running Spark application doesn't clean old shuffle data correctly

2019-07-21 Thread Keith Chapman
Hi Alex,

Shuffle files in spark are deleted when the object holding a reference to
the shuffle file on disk goes out of scope (is garbage collected by the
JVM).  Could it be the case that you are keeping these objects alive?

Regards,
Keith.

http://keith-chapman.com


On Sun, Jul 21, 2019 at 12:19 AM Alex Landa  wrote:

> Thanks,
> I looked into these options, the cleaner periodic interval is set to 30
> min by default.
> The block option for shuffle -
> *spark.cleaner.referenceTracking.blocking.shuffle* - is set to false by
> default.
> What are the implications of setting it to true?
> Will it make the driver slower?
>
> Thanks,
> Alex
>
> On Sun, Jul 21, 2019 at 9:06 AM Prathmesh Ranaut Gmail <
> prathmesh.ran...@gmail.com> wrote:
>
>> This is the job of ContextCleaner. There are few a property that you can
>> tweak to see if that helps:
>> spark.cleaner.periodicGC.interval
>> spark.cleaner.referenceTracking
>> spark.cleaner.referenceTracking.blocking.shuffle
>>
>> Regards
>> Prathmesh Ranaut
>>
>> On Jul 21, 2019, at 11:31 AM, Alex Landa  wrote:
>>
>> Hi,
>>
>> We are running a long running Spark application ( which executes lots of
>> quick jobs using our scheduler ) on Spark stand-alone cluster 2.4.0.
>> We see that old shuffle files ( a week old for example ) are not deleted
>> during the execution of the application, which leads to out of disk space
>> errors on the executor.
>> If we re-deploy the application, the Spark cluster take care of the
>> cleaning
>> and deletes the old shuffle data (since we have
>> /-Dspark.worker.cleanup.enabled=true/ in the worker config).
>> I don't want to re-deploy our app every week or two, but to be able to
>> configure spark to clean old shuffle data (as it should).
>>
>> How can I configure Spark to delete old shuffle data during the life time
>> of
>> the application (not after)?
>>
>>
>> Thanks,
>> Alex
>>
>>


Re: Sorting tuples with byte key and byte value

2019-07-15 Thread Keith Chapman
Hi Supun,

A couple of things with regard to your question.

--executor-cores means the number of worker threads per VM. According to
your requirement this should be set to 8.

*repartitionAndSortWithinPartitions *is a RDD operation, RDD operations in
Spark are not performant both in terms of execution and memory. I would
rather use Dataframe sort operation if performance is key.

Regards,
Keith.

http://keith-chapman.com


On Mon, Jul 15, 2019 at 8:45 AM Supun Kamburugamuve <
supun.kamburugam...@gmail.com> wrote:

> Hi all,
>
> We are trying to measure the sorting performance of Spark. We have a 16
> node cluster with 48 cores and 256GB of ram in each machine and 10Gbps
> network.
>
> Let's say we are running with 128 parallel tasks and each partition
> generates about 1GB of data (total 128GB).
>
> We are using the method *repartitionAndSortWithinPartitions*
>
> A standalone cluster is used with the following configuration.
>
> SPARK_WORKER_CORES=1
> SPARK_WORKER_MEMORY=16G
> SPARK_WORKER_INSTANCES=8
>
> --executor-memory 16G --executor-cores 1 --num-executors 128
>
> I believe this sets 128 executors to run the job each having 16GB of
> memory and spread across 16 nodes with 8 threads in each node. This
> configuration runs very slow. The program doesn't use disks to read or
> write data (data generated in-memory and we don't write to file after
> sorting).
>
> It seems even though the data size is small, it uses disk for the shuffle.
> We are not sure our configurations are optimal to achieve the best
> performance.
>
> Best,
> Supun..
>
>


Re: Override jars in spark submit

2019-06-19 Thread Keith Chapman
Hi Naresh,

You could use "--conf spark.driver.extraClassPath=". Note
that the jar will not be shipped to the executors, if its a class that is
needed on the executors as well you should provide "--conf
spark.executor.extraClassPath=". Note that if you do
provide executor extraclasspath the jar file needs to be present on all the
executors.

Regards,
Keith.

http://keith-chapman.com


On Wed, Jun 19, 2019 at 8:57 PM naresh Goud 
wrote:

> Hello All,
>
> How can we override jars in spark submit?
> We have hive-exec-spark jar which is available as part of default spark
> cluster jars.
> We wanted to override above mentioned jar in spark submit with latest
> version jar.
> How do we do that ?
>
>
> Thank you,
> Naresh
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


Re: [pyspark 2.3] count followed by write on dataframe

2019-05-20 Thread Keith Chapman
Yes that is correct, that would cause computation twice. If you want the
computation to happen only once you can cache the dataframe and call count
and write on the cached dataframe.

Regards,
Keith.

http://keith-chapman.com


On Mon, May 20, 2019 at 6:43 PM Rishi Shah  wrote:

> Hi All,
>
> Just wanted to confirm my understanding around actions on dataframe. If
> dataframe is not persisted at any point, & count() is called on a dataframe
> followed by write action --> this would trigger dataframe computation twice
> (which could be the performance hit for a larger dataframe).. Could anyone
> please help confirm?
>
> --
> Regards,
>
> Rishi Shah
>


Pyspark error when converting string to timestamp in map function

2018-08-17 Thread Keith Chapman
Hi all,

I'm trying to create a dataframe enforcing a schema so that I can write it
to a parquet file. The schema has timestamps and I get an error with
pyspark. The following is a snippet of code that exhibits the problem,

df = sqlctx.range(1000)
schema = StructType([StructField('a', TimestampType(), True)])
df1 = sqlctx.createDataFrame(df.rdd.map(row_gen_func), schema)

row_gen_func is a function that retruns timestamp strings of the form
"2018-03-21 11:09:44"

When I compile this with Spark 2.2 I get the following error,

raise TypeError("%s can not accept object %r in type %s" % (dataType, obj,
type(obj)))
TypeError: TimestampType can not accept object '2018-03-21 08:06:17' in
type 

Regards,
Keith.

http://keith-chapman.com


Re: GC- Yarn vs Standalone K8

2018-06-11 Thread Keith Chapman
Spark on EMR is configured to use CMS GC, specifically following flags,

spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'


Regards,
Keith.

http://keith-chapman.com

On Mon, Jun 11, 2018 at 8:22 PM, ankit jain  wrote:

> Hi,
> Does anybody know if Yarn uses a different Garbage Collector from Spark
> standalone?
>
> We migrated our application recently from EMR to K8(not using native spark
> on k8 yet) and see quite a bit of performance degradation.
>
> Diving further it seems garbage collection is running too often, up-to 50%
> of task time even with small amount of data - PFA Spark UI screenshot.
>
> I have updated GC to G1GC and it has helped a bit - GC time have come down
> from 50-30%, still too high though.
>
> Also enabled -verbose:gc, so will be much more metrics to play with but
> any pointers meanwhile will be appreciated.
>
>
> --
> Thanks & Regards,
> Ankit.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-26 Thread Keith Chapman
Hi Michael,

sorry for the late reply. I guess you may have to set it through the hdfs
core-site.xml file. The property you need to set is "hadoop.tmp.dir" which
defaults to "/tmp/hadoop-${user.name}"

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma <mshte...@gmail.com> wrote:

> Hi Keith,
>
> Thank you for the idea!
> I have tried it, so now the executor command is looking in the following
> way :
>
> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
> '-Djava.io.tmpdir=my_prefered_path'
> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/
> msh/appcache/application_1521110306769_0041/container_
> 1521110306769_0041_01_04/tmp
>
> JVM is using the second Djava.io.tmpdir parameter and writing
> everything to the same directory as before.
>
> Best,
> Michael
> Sincerely,
> Michael Shtelma
>
>
> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
> > Can you try setting spark.executor.extraJavaOptions to have
> > -Djava.io.tmpdir=someValue
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
> > wrote:
> >>
> >> Hi Keith,
> >>
> >> Thank you for your answer!
> >> I have done this, and it is working for spark driver.
> >> I would like to make something like this for the executors as well, so
> >> that the setting will be used on all the nodes, where I have executors
> >> running.
> >>
> >> Best,
> >> Michael
> >>
> >>
> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com
> >
> >> wrote:
> >> > Hi Michael,
> >> >
> >> > You could either set spark.local.dir through spark conf or
> >> > java.io.tmpdir
> >> > system property.
> >> >
> >> > Regards,
> >> > Keith.
> >> >
> >> > http://keith-chapman.com
> >> >
> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hi everybody,
> >> >>
> >> >> I am running spark job on yarn, and my problem is that the blockmgr-*
> >> >> folders are being created under
> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> >> >> The size of this folder can grow to a significant size and does not
> >> >> really fit into /tmp file system for one job, which makes a real
> >> >> problem for my installation.
> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> >> location and expected that the block manager will create the files
> >> >> there and not under /tmp, but this is not the case. The files are
> >> >> created under /tmp.
> >> >>
> >> >> I am wondering if there is a way to make spark not use /tmp at all
> and
> >> >> configure it to create all the files somewhere else ?
> >> >>
> >> >> Any assistance would be greatly appreciated!
> >> >>
> >> >> Best,
> >> >> Michael
> >> >>
> >> >> 
> -
> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >>
> >> >
> >
> >
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Keith Chapman
Can you try setting spark.executor.extraJavaOptions to have -D
java.io.tmpdir=someValue

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
wrote:

> Hi Keith,
>
> Thank you for your answer!
> I have done this, and it is working for spark driver.
> I would like to make something like this for the executors as well, so
> that the setting will be used on all the nodes, where I have executors
> running.
>
> Best,
> Michael
>
>
> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
> > Hi Michael,
> >
> > You could either set spark.local.dir through spark conf or java.io.tmpdir
> > system property.
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
> wrote:
> >>
> >> Hi everybody,
> >>
> >> I am running spark job on yarn, and my problem is that the blockmgr-*
> >> folders are being created under
> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> >> The size of this folder can grow to a significant size and does not
> >> really fit into /tmp file system for one job, which makes a real
> >> problem for my installation.
> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> location and expected that the block manager will create the files
> >> there and not under /tmp, but this is not the case. The files are
> >> created under /tmp.
> >>
> >> I am wondering if there is a way to make spark not use /tmp at all and
> >> configure it to create all the files somewhere else ?
> >>
> >> Any assistance would be greatly appreciated!
> >>
> >> Best,
> >> Michael
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Keith Chapman
Hi Michael,

You could either set spark.local.dir through spark conf or java.io.tmpdir
system property.

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma  wrote:

> Hi everybody,
>
> I am running spark job on yarn, and my problem is that the blockmgr-*
> folders are being created under
> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> The size of this folder can grow to a significant size and does not
> really fit into /tmp file system for one job, which makes a real
> problem for my installation.
> I have redefined hadoop.tmp.dir in core-site.xml and
> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> location and expected that the block manager will create the files
> there and not under /tmp, but this is not the case. The files are
> created under /tmp.
>
> I am wondering if there is a way to make spark not use /tmp at all and
> configure it to create all the files somewhere else ?
>
> Any assistance would be greatly appreciated!
>
> Best,
> Michael
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Can I get my custom spark strategy to run last?

2018-03-01 Thread Keith Chapman
Hi,

I'd like to write a custom Spark strategy that runs after all the existing
Spark strategies are run. Looking through the Spark code it seems like the
custom strategies are prepended to the list of strategies in Spark. Is
there a way I could get it to run last?

Regards,
Keith.

http://keith-chapman.com


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
My issue is that there is not enough pressure on GC, hence GC is not
kicking in fast enough to delete the shuffle files of previous iterations.

Regards,
Keith.

http://keith-chapman.com

On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> It would be very difficult to tell without knowing what is your
> application code doing, what kind of transformation/actions performing.
> From my previous experience tuning application code which avoids
> unnecessary objects reduce pressure on GC.
>
>
> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm benchmarking a spark application by running it for multiple
>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>> machine with a very large hear (~200GB). The system has a SSD. When running
>> for 3 to 4 iterations I get into a situation that I run out of disk space
>> on the /tmp directory. On further investigation I was able to figure out
>> that the reason for this is that the shuffle files are still around,
>> because I have a very large hear GC has not happen and hence the shuffle
>> files are not deleted. I was able to confirm this by lowering the heap size
>> and I see GC kicking in more often and the size of /tmp stays under
>> control. Is there any way I could configure spark to handle this issue?
>>
>> One option that I have is to have GC run more often by
>> setting spark.cleaner.periodicGC.interval to a much lower value. Is
>> there a cleaner solution?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>
>


Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
Hi,

I'm benchmarking a spark application by running it for multiple iterations,
its a benchmark thats heavy on shuffle and I run it on a local machine with
a very large hear (~200GB). The system has a SSD. When running for 3 to 4
iterations I get into a situation that I run out of disk space on the /tmp
directory. On further investigation I was able to figure out that the
reason for this is that the shuffle files are still around, because I have
a very large hear GC has not happen and hence the shuffle files are not
deleted. I was able to confirm this by lowering the heap size and I see GC
kicking in more often and the size of /tmp stays under control. Is there
any way I could configure spark to handle this issue?

One option that I have is to have GC run more often by
setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
cleaner solution?

Regards,
Keith.

http://keith-chapman.com


Re: update LD_LIBRARY_PATH when running apache job in a YARN cluster

2018-01-17 Thread Keith Chapman
Hi Manuel,

You could use the following to add a path to the library search path,
--conf spark.driver.extraLibraryPath=PathToLibFolder
--conf spark.executor.extraLibraryPath=PathToLibFolder

Thanks,
Keith.

Regards,
Keith.

http://keith-chapman.com

On Wed, Jan 17, 2018 at 5:39 PM, Manuel Sopena Ballesteros <
manuel...@garvan.org.au> wrote:

> Dear Spark community,
>
>
>
> I have a spark running in a yarn cluster and I am getting some error when
> trying to run my python application.
>
>
>
> /home/mansop/virtenv/bin/python2.7: error while loading shared libraries:
> libpython2.7.so.1.0: cannot open shared object file: No such file or
> directory
>
>
>
> Is there a way to specify the LD_LIBRARY_PATH in the spark-submit command
> or in the config file?
>
>
>
>
>
> *Manuel Sopena Ballesteros *| Big data Engineer
> *Garvan Institute of Medical Research *
> The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010
> 
> *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507
> <+61%202%209295%208507> | *E:* manuel...@garvan.org.au
>
>
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


Re: What are some disadvantages of issuing a raw sql query to spark?

2017-07-25 Thread Keith Chapman
Here is an example of a window lead function,

select *, lead(someColumn1) over ( partition by someColumn2 order by
someColumn13 asc nulls first) as someName  from someTable

Regards,
Keith.

http://keith-chapman.com

On Tue, Jul 25, 2017 at 9:15 AM, kant kodali <kanth...@gmail.com> wrote:

> How do I Specify windowInterval and slideInteval using raw sql string?
>
> On Tue, Jul 25, 2017 at 8:52 AM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
>
>> You could issue a raw sql query to spark, there is no particular
>> advantage or disadvantage of doing so. Spark would build a logical plan
>> from the raw sql (or DSL) and optimize on that. Ideally you would end up
>> with the same physical plan, irrespective of it been written in raw sql /
>> DSL.
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>> On Tue, Jul 25, 2017 at 12:50 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> I just want to run some spark structured streaming Job similar to this
>>>
>>> DS.filter(col("name").equalTo("john"))
>>> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
>>> hours"), df1.col("hourlyPay"))
>>> .agg(sum("hourlyPay").as("total"));
>>>
>>>
>>> I am wondering if I can express the above query in raw sql string?
>>>
>>> If so how would that look like and what are some of the disadvantages of 
>>> using raw sql query vs spark DSL?
>>>
>>>
>>> Thanks!
>>>
>>>
>>
>


Re: What are some disadvantages of issuing a raw sql query to spark?

2017-07-25 Thread Keith Chapman
You could issue a raw sql query to spark, there is no particular advantage
or disadvantage of doing so. Spark would build a logical plan from the raw
sql (or DSL) and optimize on that. Ideally you would end up with the same
physical plan, irrespective of it been written in raw sql / DSL.

Regards,
Keith.

http://keith-chapman.com

On Tue, Jul 25, 2017 at 12:50 AM, kant kodali  wrote:

> HI All,
>
> I just want to run some spark structured streaming Job similar to this
>
> DS.filter(col("name").equalTo("john"))
> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
> hours"), df1.col("hourlyPay"))
> .agg(sum("hourlyPay").as("total"));
>
>
> I am wondering if I can express the above query in raw sql string?
>
> If so how would that look like and what are some of the disadvantages of 
> using raw sql query vs spark DSL?
>
>
> Thanks!
>
>


Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
You could also enable it with --conf spark.logLineage=true if you do not
want to change any code.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 7:57 PM, Keith Chapman <keithgchap...@gmail.com>
wrote:

> Hi Ron,
>
> You can try using the toDebugString method on the RDD, this will print
> the RDD lineage.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez <
> zlgonza...@yahoo.com.invalid> wrote:
>
>> Hi,
>>   Can someone point me to a test case or share sample code that is able
>> to extract the RDD graph from a Spark job anywhere during its lifecycle? I
>> understand that Spark has UI that can show the graph of the execution so
>> I'm hoping that is using some API somewhere that I could use.
>>   I know RDD is the actual execution graph, so if there is also a more
>> logical abstraction API closer to calls like map, filter, aggregate, etc.,
>> that would even be better.
>>   Appreciate any help...
>>
>> Thanks,
>> Ron
>>
>
>


Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
Hi Ron,

You can try using the toDebugString method on the RDD, this will print the
RDD lineage.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez  wrote:

> Hi,
>   Can someone point me to a test case or share sample code that is able to
> extract the RDD graph from a Spark job anywhere during its lifecycle? I
> understand that Spark has UI that can show the graph of the execution so
> I'm hoping that is using some API somewhere that I could use.
>   I know RDD is the actual execution graph, so if there is also a more
> logical abstraction API closer to calls like map, filter, aggregate, etc.,
> that would even be better.
>   Appreciate any help...
>
> Thanks,
> Ron
>


Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Keith Chapman
Hi Nguyen,

This looks promising and seems like I could achieve it using cluster by.
Thanks for the pointer.

Regards,
Keith.

http://keith-chapman.com

On Sat, Jun 24, 2017 at 5:27 AM, nguyen duc Tuan <newvalu...@gmail.com>
wrote:

> Hi Chapman,
> You can use "cluster by" to do what you want.
> https://deepsense.io/optimize-spark-with-distribute-by-and-cluster-by/
>
> 2017-06-24 17:48 GMT+07:00 Saliya Ekanayake <esal...@gmail.com>:
>
>> I haven't worked with datasets but would this help
>> https://stackoverflow.com/questions/37513667/how-to-cre
>> ate-a-spark-dataset-from-an-rdd?
>>
>> On Jun 23, 2017 5:43 PM, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have code that does the following using RDDs,
>>>
>>> val outputPartitionCount = 300
>>> val part = new MyOwnPartitioner(outputPartitionCount)
>>> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)
>>>
>>> where myRdd is correctly formed as key, value pairs. I am looking
>>> convert this to use Dataset/Dataframe instead of RDDs, so my question is:
>>>
>>> Is there custom partitioning of Dataset/Dataframe implemented in Spark?
>>> Can I accomplish the partial sort using mapPartitions on the resulting
>>> partitioned Dataset/Dataframe?
>>>
>>> Any thoughts?
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>
>


Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Keith Chapman
Thanks for the pointer Saliya, I'm looking got an equivalent api in
dataset/dataframe for repartitionAndSortWithinPartitions, I've already
converted most of the RDD's to Dataframes.

Regards,
Keith.

http://keith-chapman.com

On Sat, Jun 24, 2017 at 3:48 AM, Saliya Ekanayake <esal...@gmail.com> wrote:

> I haven't worked with datasets but would this help https://stackoverflow.
> com/questions/37513667/how-to-create-a-spark-dataset-from-an-rdd?
>
> On Jun 23, 2017 5:43 PM, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
>> Hi,
>>
>> I have code that does the following using RDDs,
>>
>> val outputPartitionCount = 300
>> val part = new MyOwnPartitioner(outputPartitionCount)
>> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)
>>
>> where myRdd is correctly formed as key, value pairs. I am looking convert
>> this to use Dataset/Dataframe instead of RDDs, so my question is:
>>
>> Is there custom partitioning of Dataset/Dataframe implemented in Spark?
>> Can I accomplish the partial sort using mapPartitions on the resulting
>> partitioned Dataset/Dataframe?
>>
>> Any thoughts?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>


Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-23 Thread Keith Chapman
Hi,

I have code that does the following using RDDs,

val outputPartitionCount = 300
val part = new MyOwnPartitioner(outputPartitionCount)
val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)

where myRdd is correctly formed as key, value pairs. I am looking convert
this to use Dataset/Dataframe instead of RDDs, so my question is:

Is there custom partitioning of Dataset/Dataframe implemented in Spark?
Can I accomplish the partial sort using mapPartitions on the resulting
partitioned Dataset/Dataframe?

Any thoughts?

Regards,
Keith.

http://keith-chapman.com


Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread Keith Chapman
As Paul said it really depends on what you want to do with your data,
perhaps writing it to a file would be a better option, but again it depends
on what you want to do with the data you collect.

Regards,
Keith.

http://keith-chapman.com

On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern 
wrote:

> Hi,
>
> depending on what you're trying to achieve `RDD.toLocalIterator()` might
> help you.
>
> Best
>
> Eike
>
>
> 2017-03-29 21:00 GMT+02:00 szep.laszlo.it :
>
>> Hi,
>>
>> after I created a dataset
>>
>> Dataset df = sqlContext.sql("query");
>>
>> I need to have a result values and I call a method: collectAsList()
>>
>> List list = df.collectAsList();
>>
>> But it's very slow, if I work with large datasets (20-30 million
>> records). I
>> know, that the result isn't presented in driver app, that's why it takes
>> long time, because collectAsList() collect all data from worker nodes.
>>
>> But then what is the right way to get result values? Is there an other
>> solution to iterate over a result dataset rows, or get values? Can anyone
>> post a small & working example?
>>
>> Thanks & Regards,
>> Laszlo Szep
>>
>


Re: Having issues reading a csv file into a DataSet using Spark 2.1

2017-03-22 Thread Keith Chapman
Thanks for the advice Diego, that was very helpful. How could I read the
csv as a dataset though? I need to do a map operation over the dataset, I
just coded up an example to illustrate the issue

On Mar 22, 2017 6:43 PM, "Diego Fanesi" <diego.fan...@gmail.com> wrote:

> You are using spark as a library but it is much more than that. The book
> "learning Spark"  is very well done and it helped me a lot starting with
> spark. Maybe you should start from there.
>
> Those are the issues in your code:
>
> Basically, you generally don't execute spark code like that. You could but
> it is not officially supported and many functions don't work in that way.
> You should start your local cluster made of master and single worker, then
> make a jar with your code and use spark-submit to send it to the cluster.
>
> You generally never use args because spark is a multiprocess, multi-thread
> application so args will not be available everywhere.
>
> All contexts have been merged into the same context in the last versions
> of spark. so you will need to do something like this:
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
>
> object DatasetTest{
>
> val spark: SparkSession = SparkSession
>   .builder() .master("local[8]")
>   .appName("Spark basic example").getOrCreate()
>
> import spark.implicits._
>
> def main(Args: Array[String]) {
>
> var x = spark.read.format("csv").load("/home/user/data.csv")
>
> x.show()
>
> }
>
> }
>
>
> hope this helps.
>
> Diego
>
> On 22 Mar 2017 7:18 pm, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
> Hi,
>
> I'm trying to read in a CSV file into a Dataset but keep having
> compilation issues. I'm using spark 2.1 and the following is a small
> program that exhibit the issue I'm having. I've searched around but not
> found a solution that worked, I've added "import sqlContext.implicits._" as
> suggested but no luck. What am I missing? Would appreciate some advice.
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{Encoder,Encoders}
>
> object DatasetTest{
>
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("DatasetTest")
> val sc = new SparkContext(sparkConf)
> case class Foo(text: String)
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext.implicits._
> val ds : org.apache.spark.sql.Dataset[Foo] =
> sqlContext.read.csv(args(1)).as[Foo]
> ds.show
>   }
> }
>
> Compiling the above program gives, I'd expect it to work as its a simple
> case class, changing it to as[String] works, but I would like to get the
> case class to work.
>
> [error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder
> for type stored in a Dataset.  Primitive types (Int, String, etc) and
> Product types (case classes) are supported by importing spark.implicits._
> Support for serializing other types will be added in future releases.
> [error] val ds : org.apache.spark.sql.Dataset[Foo] =
> sqlContext.read.csv(args(1)).as[Foo]
>
>
> Regards,
> Keith.
>
>
>


Having issues reading a csv file into a DataSet using Spark 2.1

2017-03-22 Thread Keith Chapman
Hi,

I'm trying to read in a CSV file into a Dataset but keep having compilation
issues. I'm using spark 2.1 and the following is a small program that
exhibit the issue I'm having. I've searched around but not found a solution
that worked, I've added "import sqlContext.implicits._" as suggested but no
luck. What am I missing? Would appreciate some advice.

import org.apache.spark.sql.functions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Encoder,Encoders}

object DatasetTest{

  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("DatasetTest")
val sc = new SparkContext(sparkConf)
case class Foo(text: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ds : org.apache.spark.sql.Dataset[Foo] =
sqlContext.read.csv(args(1)).as[Foo]
ds.show
  }
}

Compiling the above program gives, I'd expect it to work as its a simple
case class, changing it to as[String] works, but I would like to get the
case class to work.

[error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder
for type stored in a Dataset.  Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
[error] val ds : org.apache.spark.sql.Dataset[Foo] =
sqlContext.read.csv(args(1)).as[Foo]


Regards,
Keith.


Re:

2017-01-20 Thread Keith Chapman
Hi Jacek,

I've looked at SparkListener and tried it, I see it getting fired on the
master but I don't see it getting fired on the workers in a cluster.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> (redirecting to users as it has nothing to do with Spark project
> development)
>
> Monitor jobs and stages using SparkListener and submit cleanup jobs where
> a condition holds.
>
> Jacek
>
> On 20 Jan 2017 3:57 a.m., "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
>> Hi ,
>>
>> Is it possible for an executor (or slave) to know when an actual job
>> ends? I'm running spark on a cluster (with yarn) and my workers create some
>> temporary files that I would like to clean up once the job ends. Is there a
>> way for the worker to detect that a job has finished? I tried doing it in
>> the JobProgressListener but it does not seem to work in a cluster. The
>> event is not triggered in the worker.
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>