Re: newbie question for reduce

2022-01-18 Thread Sean Owen
The problem is that you are reducing a list of tuples, but you are
producing an int. The resulting int can't be combined with other tuples
with your function. reduce() has to produce the same type as its arguments.
rdd.map(lambda x: x[1]).reduce(lambda x,y: x+y)
... would work

On Tue, Jan 18, 2022 at 8:41 PM  wrote:

> Hello
>
> Please help take a look why my this simple reduce doesn't work?
>
> >>> rdd = sc.parallelize([("a",1),("b",2),("c",3)])
> >>>
> >>> rdd.reduce(lambda x,y: x[1]+y[1])
> Traceback (most recent call last):
>File "", line 1, in 
>File "/opt/spark/python/pyspark/rdd.py", line 1001, in reduce
>  return reduce(f, vals)
>File "/opt/spark/python/pyspark/util.py", line 74, in wrapper
>  return f(*args, **kwargs)
>File "", line 1, in 
> TypeError: 'int' object is not subscriptable
> >>>
>
>
> spark 3.2.0
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


newbie question for reduce

2022-01-18 Thread capitnfrakass

Hello

Please help take a look why my this simple reduce doesn't work?


rdd = sc.parallelize([("a",1),("b",2),("c",3)])

rdd.reduce(lambda x,y: x[1]+y[1])

Traceback (most recent call last):
  File "", line 1, in 
  File "/opt/spark/python/pyspark/rdd.py", line 1001, in reduce
return reduce(f, vals)
  File "/opt/spark/python/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
  File "", line 1, in 
TypeError: 'int' object is not subscriptable





spark 3.2.0

Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark] How to download Zip file from SFTP location and put in into Azure Data Lake and unzip it

2022-01-18 Thread Wes Peng
How large is the file? From my experience, reading the excel file from 
data lake and loading as dataframe, works great.


Thanks

On 2022-01-18 22:16, Heta Desai wrote:

Hello,

 I have zip files on SFTP location. I want to download/copy those
files and put into Azure Data Lake. Once the zip files get stored into
Azure Data Lake, I want to unzip those files and read using Data
Frames.

 The file format inside zip is excel. SO, once files are unzipped, I
want to read excel files using spark DataFrames.

 Please help me with the solution as soon as possible.

 Thanks,

 ​Heta Desai | Data | Sr Associate L1
e.heta.de...@1rivet.com | t. +91 966.225.4954

 ​

 This email, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If you are not the intended recipient,
please advise the sender immediately and delete this message and any
attachments. Unless otherwise specifically stated in this email,
transaction related information in this email, including attachments,
is not to be construed as an offer, solicitation or the basis or
confirmation for any contract for the purchase/sale of any services.
All email sent to or from this address will be received by 1Rivet US,
Inc and is subject to archival retention and review by someone other
than the recipient.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Pyspark] How to download Zip file from SFTP location and put in into Azure Data Lake and unzip it

2022-01-18 Thread Heta Desai
Hello,

I have zip files on SFTP location. I want to download/copy those files and put 
into Azure Data Lake. Once the zip files get stored into Azure Data Lake, I 
want to unzip those files and read using Data Frames.

The file format inside zip is excel. SO, once files are unzipped, I want to 
read excel files using spark DataFrames.

Please help me with the solution as soon as possible.


Thanks,
[cid:1885f47f-6a84-4099-9a4f-eabe570a41ac]  ​Heta Desai | Data | Sr 
Associate L1
e.heta.de...@1rivet.com | t. +91 966.225.4954
​

This email, including attachments, may include confidential and/or proprietary 
information, and may be used only by the person or entity to which it is 
addressed. If you are not the intended recipient, please advise the sender 
immediately and delete this message and any attachments. Unless otherwise 
specifically stated in this email, transaction related information in this 
email, including attachments, is not to be construed as an offer, solicitation 
or the basis or confirmation for any contract for the purchase/sale of any 
services. All email sent to or from this address will be received by 1Rivet US, 
Inc and is subject to archival retention and review by someone other than the 
recipient.


[ML Intermediate]: Slow fitting of Linear regression vs Sklearn

2022-01-18 Thread Hu You
Hi team,

We are testing the performance and capability of Spark for Linear regression 
application to replace at least sklearn linear regression.
We firstly generated data for model fitting via 
sklearn.dataset.make_regression. See the generation code 

 here.

Then, we performed 
sklearn(Python)
 and 
Spark(scala)
 for model fitting. We setup our models according to the post in 
stackoverflow
 to try to get same model in both sides.
The Spark setting is:

l  Spark3.1

l  Spark-shell -I

l  Spark Standalone with  3 exectors(8 cores;128GB per each)

l  Driver: spark.driver.maxResultSize=20g; spark.driver.memory=100g; 
spark.executor.memory=128g

Fitting speed:
Python:19S
Spark: 600s+

Resource Utilization:
Python: 150GB
Spark: Node1(50GB+);Node2,3 and Driver node(2GB)

Got stuck on
MapPartitionsRDD [23]
treeAggregate at WeightedLeastSquares.scala:107

What we tried:

Ø  maxTreeDepth

Ø  repartion

Ø  standerlization
None of them have significent effect on Trainning speed.

Could you please help to figure out where the issue comes from? 20X slower is 
not acceptable for us despite Spark has other good features.

Thanks!
You Hu



Re: Spark on k8s : spark 3.0.1 spark.kubernetes.executor.deleteontermination issue

2022-01-18 Thread Pralabh Kumar
Does this property spark.kubernetes.executor.deleteontermination checks
whether the executor which is deleted have shuffle data or not ?

On Tue, 18 Jan 2022, 11:20 Pralabh Kumar,  wrote:

> Hi spark team
>
> Have cluster wide property spark.kubernetis.executor.deleteontermination
> to true.
> During the long running job, some of the executor got deleted which have
> shuffle data. Because of this,  in the subsequent stage , we get lot of
> spark shuffle fetch fail exceptions.
>
>
> Please let me know , is there a way to fix it. Note if setting above
> property to false , I face no shuffle fetch exception.
>
>
> Regards
> Pralabh
>


Regarding spark-3.2.0 decommission features.

2022-01-18 Thread Patidar, Mohanlal (Nokia - IN/Bangalore)
Hi,
 We're using Spark 3.2.0 and we have enabled the spark decommission 
feature. As part of validating this feature, we wanted to check if the rdd 
blocks and shuffle blocks from the decommissioned executors are migrated to 
other executors.
However, we could not see this happening. Below is the configuration we used.

  1.  Spark Configuration used:
 spark.local.dir /mnt/spark-ldir
 spark.decommission.enabled true
 spark.storage.decommission.enabled true
 spark.storage.decommission.rddBlocks.enabled true
 spark.storage.decommission.shuffleBlocks.enabled true
 spark.dynamicAllocation.enabled true
  2.  Brought up spark-driver and executors on the different nodes.
NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  3.  Bringdown Node2 so status of pods as are following.

NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  4.  Driver logs:
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.296Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.459Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.564Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.601Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.667Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.885Z", 
"timezone":"UTC", "log":"Notify executor 5 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Notify executor 1 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Notify executor 3 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Mark BlockManagers (BlockManagerId(5, X.X.X.X, 33359, 
None), BlockManagerId(1, X.X.X.X, 38655, None), BlockManagerId(3, X.X.X.X, 
35797, None)) as being decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:59:24.426Z", 
"timezone":"UTC", "log":"Executor 2 is removed. Remove reason statistics: 
(gracefully decommissioned: 0, decommision unfinished: 0, driver killed: 0, 
unexpectedly exited: 1)."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:59:24.426Z", 
"timezone":"UTC", "log":"Executor 4 is removed. Remove reason statistics: 
(gracefully decommissioned: 0, decommision unfinished: 0, driver killed: 0, 
unexpectedly exited: 2)."}
  5.  Verified by Execute into all live executors(1,3,5) and checked at 
location (/mnt/spark-ldir/) so only one blockManger id present, not seeing any 
other blockManager id copied to this location.
Example:
$kubectl exec -it 
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1   -n test bash
$cd /mnt/spark-ldir/
$ blockmgr-60872c99-e7d6-43ba-a43e-a97fc9f619ca

Since the migration was not happening, we tried to use fallback storage option 
by specifying the hdfs storage. But unfortunately we could not see the rdd and 
shuffle blocks in this fallback storage location as well. Below is the 
configuration we used.


  1.  Spark Configuration Used:
 spark.decommission.enabled true
 spark.storage.decommission.enabled true
 spark.storage.decommission.rddBlocks.enabled true