Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-02 Thread Pralabh Kumar
I am performing join operation , if I convert reduce side join to map side
(no shuffle will happen)  and I assume in that case this error shouldn't
come. Let me know if this understanding is correct

On Tue, May 1, 2018 at 9:37 PM, Ryan Blue  wrote:

> This is usually caused by skew. Sometimes you can work around it by in
> creasing the number of partitions like you tried, but when that doesn’t
> work you need to change the partitioning that you’re using.
>
> If you’re aggregating, try adding an intermediate aggregation. For
> example, if your query is select sum(x), a from t group by a, then try select
> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
> group by a.
>
> rb
> ​
>
> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar 
> wrote:
>
>> Hi
>>
>> I am getting the above error in Spark SQL . I have increase (using 5000 )
>> number of partitions but still getting the same error .
>>
>> My data most probably is skew.
>>
>>
>>
>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


MappingException - org.apache.spark.mllib.classification.LogisticRegressionModel.load

2018-05-02 Thread Mina Aslani
Hi,

I used pyspark to create a Logistic Regression model, train my training
data and evaluate my test data using ML api. However, to use the model in
my program, I saved the model(e.g. Logistic Regression model) and when I
tried to load it in pyspark using

sameModel = LogisticRegressionModel.load(sc,path)
It throws below error:

An error occurred while calling
z:org.apache.spark.mllib.classification.LogisticRegressionModel.load.

: org.json4s.package$MappingException: Did not find value which can be
converted into java.lang.String


Is there a way to load the model in ML instead of MLIB?

Your input is appreciated.


Best regards,

Mina


AccumulatorV2 vs AccumulableParam (V1)

2018-05-02 Thread Sergey Zhemzhitsky
Hello guys,

I've started to migrate my Spark jobs which use Accumulators V1 to
AccumulatorV2 and faced with the following issues:

1. LegacyAccumulatorWrapper now requires the resulting type of
AccumulableParam to implement equals. In other case the
AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
will fail with AssertionError (SPARK-23697 [1]).

2. Existing AccumulatorV2 classes are hardly difficult to extend
easily and correctly (SPARK-24154 [2]) due to its "copy" method which
is called during serialization and usually loses type information of
descendant classes which don't override "copy" (and it's easier to
implement an accumulator from scratch than override it correctly)

3. The same instance of AccumulatorV2 cannot be used with the same
SparkContext multiple times (unlike AccumulableParam) failing with
"IllegalStateException: Cannot register an Accumulator twice" even
after "reset" method called. So it's impossible to unregister already
registered accumulator from user code.

4. AccumulableParam (V1) implementations are usually more or less
stateless, while AccumulatorV2 implementations are almost always
stateful, leading to (unnecessary?) type checks (unlike
AccumulableParam). For example typical "merge" method of AccumulatorV2
requires to check whether current accumulator is of an appropriate
type, like here [3]

5. AccumulatorV2 is more difficult to implement correctly unlike
AccumulableParam. For example, in case of AccumulableParam I have to
implement just 3 methods (addAccumulator, addInPlace, zero), in case
of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)

6. AccumulatorV2 classes are hardly possible to be anonymous classes,
because of their "copy" and "merge" methods which typically require a
concrete class to make a type check.

I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
just wondering whether there is a way to simplify the API of
AccumulatorV2 to meet the points described above and to be less error
prone?


[1] https://issues.apache.org/jira/browse/SPARK-23697
[2] https://issues.apache.org/jira/browse/SPARK-24154
[3] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348
[4] https://issues.apache.org/jira/browse/SPARK-14654

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Uncaught exception in thread heartbeat-receiver-event-loop-thread

2018-05-02 Thread ccherng
And reading through the comments in that issue
https://issues.apache.org/jira/browse/SPARK-20977 it looks like it was just
ignored but marked resolved.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ConcurrentModificationException

2018-05-02 Thread ccherng
I have encountered the below exception running Spark 2.1.0 on emr. The
exception is the same as reported in

Serialization of accumulators in heartbeats is not thread-safe
https://issues.apache.org/jira/browse/SPARK-17463

Pull requests were made and merged and that issue was marked as resolved but
someone named Sunil in the comments said they still were encountering the
problem with Spark 2.0.2 on emr. I am too. Should this issue be reopened?


18/04/30 22:54:15 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(4229,[Lscala.Tuple2;@5e8fe6a5,BlockManagerId(4229,
ip-172-23-229-187.ec2.internal, 35905, None))] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at

Re: Uncaught exception in thread heartbeat-receiver-event-loop-thread

2018-05-02 Thread ccherng
I have also encountered the NullPointerException in CollectionAccumulator. It
looks like there was an issue filed for this
https://issues.apache.org/jira/browse/SPARK-20977.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-05-02 Thread Marco Mistroni
Hi
 Sorted ..I just replaced s3 with s3aI think I recall similar issues in
the past with aws libraries.
Thx anyway for getting back
Kr

On Wed, May 2, 2018, 4:57 PM Paul Tremblay  wrote:

> I would like to see the full error. However, S3 can give misleading
> messages if you don't have the correct permissions.
>
> On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni  wrote:
>
>> HI all
>>  i am using the following code for persisting data into S3 (aws keys are
>> already stored in the environment variables)
>>
>> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)
>>
>>
>> However, i keep on receiving an exception that the file does not exist
>>
>> here's what comes from logs
>>
>> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
>> s3://ec2-bucket-mm-spark/form4-results-2404.results
>> Exception in thread "main" java.io.IOException:
>> /form4-results-2404.results doesn't exist
>>
>> It seems that Spark expects the file to be there before writing? which
>> seems bizzarre?
>>
>> I Have even tried to remove the coalesce ,but still got the same exception
>> Could anyone help pls?
>> kind regarsd
>>  marco
>>
>


Running apps over a VPN

2018-05-02 Thread Christopher Piggott
My setup is that I have a spark master (using the spark scheduler) and 32
workers registered with it but they are on a private network.  I can
connect to that private network via OpenVPN.

I would like to be able to run spark applications from a local (on my
desktop) IntelliJ but have them use the remote master/workers.

I thought this would allow that:

sparkConf.set("spark.submit.deployMode", "cluster")

but when my job runs it still complains that there are not enough
resources/workers.  Connecting to the master, it shows that workers have
been assigned and are in the RUNNING state.  My local spark app doesn't
agree.  It's like the workers were assigned but the PC end doesn't know.

I can use spark-submit.sh but I was really hoping to be able to run Spark
Applications directly from IDEA.  Possible?


Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-05-02 Thread Paul Tremblay
I would like to see the full error. However, S3 can give misleading
messages if you don't have the correct permissions.

On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni  wrote:

> HI all
>  i am using the following code for persisting data into S3 (aws keys are
> already stored in the environment variables)
>
> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)
>
>
> However, i keep on receiving an exception that the file does not exist
>
> here's what comes from logs
>
> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
> s3://ec2-bucket-mm-spark/form4-results-2404.results
> Exception in thread "main" java.io.IOException:
> /form4-results-2404.results doesn't exist
>
> It seems that Spark expects the file to be there before writing? which
> seems bizzarre?
>
> I Have even tried to remove the coalesce ,but still got the same exception
> Could anyone help pls?
> kind regarsd
>  marco
>


[no subject]

2018-05-02 Thread Filippo Balicchia



how to trace sparkDriver context creation for pyspark

2018-05-02 Thread Mihai Iacob
 
 
I have python jupyter notebook setup to create a spark context by default, and sometimes these fail with the following error:
 
18/04/30 18:03:27 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.18/04/30 18:03:27 ERROR SparkContext: Error initializing SparkContext.java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 100 retries! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.
I have tracked it down to two possible settings that may cause this in spark 2.0.2, client mode, standalone cluster setup, running in kubernetes:
 
spark.driver.port - we don't set it, so it should be random
spark.ui.port - we set spark.ui.enabled=false so it should not try to bind to this port.
 
Short story is I do not know which one spark gets confused about, and looking at spark code not clear how spark.ui.port would cause this even if the error message lists it as a possible cause.
 
Question 1: have you seen this before?
Question 2: how do I trace the spark driver process? It seems that I can only set the sc.logLevel after the spark context is created, but I need to trace before the spark context is created.
 
I created a log4j.properties file in the spark/conf directory and set it to TRACE but that only gets picked up when I run a Scala jupyter notebook, not when I run a python juypyter notebook, and I haven't been able to find out how to turn the same level of tracing for a spark-driver process started via a python jupyter notebook.
 
Some things I looked at:
 
`SPARK_PRINT_LAUNCH_COMMAND=1 /usr/local/spark-2.0.2-bin-hadoop2.7/bin/pyspark`
Spark Command: python2.7Python 2.7.13 |Anaconda custom (64-bit)| (default, Dec 20 2016, 23:09:15) [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2Type "help", "copyright", "credits" or "license" for more information.Anaconda is brought to you by Continuum Analytics.Please check out: http://continuum.io/thanks and https://anaconda.orgSpark Command: **/usr/lib/jvm/java-8-openjdk-amd64/bin/java -cp /usr/local/spark/conf/**:/usr/local/spark/jars/* -Xmx1g  org.apache.spark.deploy.SparkSubmit --name PySparkShell pyspark-shell 
 PPID   PID  PGID   SID TTY      TPGID STAT   UID   TIME COMMAND    0  1308  1308  1308 ?         1416 Ss       0   0:00 bash 1308  1416  1416  1308 ?         1416 R+       0   0:00  \_ ps axjf    0  1151  1151  1151 ?         1151 Ss+      0   0:00 bash    0     1     1     1 ?           -1 Ss       0   0:00 /bin/bash /usr/local/bin/start-dsx-notebook.sh    1  1014     1     1 ?           -1 S        0   0:00 /bin/sh /user-home/.scripts/publishing-startup-scripts/nbexec_py_startup.sh 1014  1026     1     1 ?           -1 S        0   0:06  \_ python /user-home/.scripts/system/publishing-api/py2http.py    1  1017     1     1 ?           -1 S        0   0:00 su -l 1001 /usr/local/bin/start-user-notebook.sh spark-master-svc:7077 dsx /user-home/1001/DSX_Projects/imagemgmt 1523893891668 imagemgmt Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRlc3QiLCJzdWIiOiJ0ZXN0IiwiaXNzIjoiS05PWFNTTyIsImF1ZCI6IkRTWCIsInJvbGUiOiJBZG1pbiIsInVpZCI6IjEwMDEiLCJpYXQiOjE1MjQ2MDIxMTd9.jHyjakD4G7XlOJ3Q1e5We3agHy_dtao_U98rZcLuTNBgGaETYKfHO2PC-94HG_nxIcTjDxymefWHItiwO7QcTIg_sIkP4uPSfQMTFthrMWNUucR0xRWJxFPcYgLlKo3T2P8JmA_LslVWqFD_MMjmYHI3UukVRj319_MSsRTW3Md3quF5mmv3OZMVjuI8faKMQF7zt_17W_QbNZAT91F0AboXJ7iazz71vcsuZZx0OxnSzJzcW3AEYb8JFWz3opbRwpc3dswbLco8TJ6I4DtacBq7syv3zg0bLIIcHSCp-LBwHrTyCWV7uJ0a3m-MSdvwdZ35WYE6_8LRwadKfW6hiw 1001 1017  1018  1018  1018 ?           -1 Ss    1001   0:00  \_ -su /usr/local/bin/start-user-notebook.sh spark-master-svc:7077 dsx /user-home/1001/DSX_Projects/imagemgmt 1523893891668 imagemgmt Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRlc3QiLCJzdWIiOiJ0ZXN0IiwiaXNzIjoiS05PWFNTTyIsImF1ZCI6IkRTWCIsInJvbGUiOiJBZG1pbiIsInVpZCI6IjEwMDEiLCJpYXQiOjE1MjQ2MDIxMTd9.jHyjakD4G7XlOJ3Q1e5We3agHy_dtao_U98rZcLuTNBgGaETYKfHO2PC-94HG_nxIcTjDxymefWHItiwO7QcTIg_sIkP4uPSfQMTFthrMWNUucR0xRWJxFPcYgLlKo3T2P8JmA_LslVWqFD_MMjmYHI3UukVRj319_MSsRTW3Md3quF5mmv3OZMVjuI8faKMQF7zt_17W_QbNZAT91F0AboXJ7iazz71vcsuZZx0OxnSzJzcW3AEYb8JFWz3opbRwpc3dswbLco8TJ6I4DtacBq7syv3zg0bLIIcHSCp-LBwHrTyCWV7uJ0a3m-MSdvwdZ35WYE6_8LRwadKfW6hiw 1001 1018  1025  1018  1018 ?           -1 Sl    1001   0:51      \_ /opt/conda/bin/python /opt/conda/bin/jupyter-notebook --NotebookApp.token= --port= --no-browser 1025  1033  1033  1033 ?           -1 Ssl   1001   0:03          \_ python -m ipykernel_launcher -f 

Re: ML Linear and Logistic Regression - Poor Performance

2018-05-02 Thread Irving Duran
May want to think about reducing the number of iterations.  Right now you
have it set at 500.

Thank You,

Irving Duran


On Fri, Apr 27, 2018 at 7:15 PM Thodoris Zois  wrote:

> I am in CentOS 7 and I use Spark 2.3.0. Below I have posted my code.
> Logistic regression took 85 minutes and linear regression 127 seconds…
>
> My dataset as I said is 128 MB and contains: 1000 features and ~100
> classes.
>
>
> #SparkSession
> ss = SparkSession.builder.getOrCreate()
>
>
> start = time.time()
>
> #Read data
> trainData = ss.read.format("csv").option("inferSchema","true").load(file)
>
> #Calculate Features
> assembler = VectorAssembler(inputCols=trainData.columns[1:], outputCol=
> "features")
> trainData = assembler.transform(trainData)
>
> #Drop columns
> dropColumns = trainData.columns
> dropColumns = [e for e in dropColumns if e not in ('_c0', 'features')]
> trainData = trainData.drop(*dropColumns)
>
> #Rename column from _c0 to label
> trainData = trainData.withColumnRenamed("_c0", "label")
>
> #Logistic regression
> lr = LogisticRegression(maxIter=500, regParam=0.3, elasticNetParam=0.8)
> lrModel = lr.fit(trainData)
>
> #Output Coefficients
> print("Coefficients: " + str(lrModel.coefficientMatrix))
>
>
>
> - Thodoris
>
>
> On 27 Apr 2018, at 22:50, Irving Duran  wrote:
>
> Are you reformatting the data correctly for logistic (meaning 0 & 1's)
> before modeling?  What are OS and spark version you using?
>
> Thank You,
>
> Irving Duran
>
>
> On Fri, Apr 27, 2018 at 2:34 PM Thodoris Zois  wrote:
>
>> Hello,
>>
>> I am running an experiment to test logistic and linear regression on
>> spark using MLlib.
>>
>> My dataset is only 128MB and something weird happens. Linear regression
>> takes about 127 seconds either with 1 or 500 iterations. On the other hand,
>> logistic regression most of the times does not manage to finish either with
>> 1 iteration. I usually get memory heap error.
>>
>> In both cases I use the default cores and memory for driver and I spawn 1
>> executor with 1 core and 2GBs of memory.
>>
>> Except that, I get a warning about NativeBLAS. I searched in the Internet
>> and I found that I have to install libgfortran. Even if I did it the
>> warning remains.
>>
>> Any ideas for the above?
>>
>> Thank you,
>> - Thodoris
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Vadim Semenov
You need to pass config before creating a session

val conf = new SparkConf()
// All three methods below are equivalent
conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword")
conf.set("spark.executorEnv.basicauth", "myuser:mypassword")
conf.setExecutorEnv("basicauth", "myuser:mypassword")
val spark = SparkSession.builder().config(conf).appName("…").getOrCreate()


On Wed, May 2, 2018 at 6:59 AM, Agostino Calamita <
agostino.calam...@gmail.com> wrote:

> Hi all,
> I wrote an application that needs an environment variable. I can set this
> variable with
>
> --conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd'
>
> in spark-submit and it works well in standalone cluster mode.
>
> But, I want set it inside the application code, because the variable
> contains a password.
>
> How can I do ?
>
> I tried with:
>
> SparkSession spark = SparkSession
>   .builder()
>   .appName("Java Spark Solr ETL")
>   .getOrCreate();
>
> 
> spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions",
> "-Dbasicauth=myuser:mypassword");
>
> but it doesn't work.
>
> Thanks.
>



-- 
Sent from my iPhone


Re: smarter way to "forget" DataFrame definition and stick to its values

2018-05-02 Thread Lalwani, Jayesh
There is a trade off involved here. If you have a Spark application with a 
complicated logical graph, you can either cache data at certain points in the 
DAG, or you don’t cache data. The side effect of caching data is higher memory 
usage. The side effect of not caching data is higher CPU usage and perhaps, IO. 
Ultimately, you can increase both memory and CPU by adding more workers to your 
cluster, and adding workers costs money. So, your caching choices are reflected 
in the overall cost of running your application. You need to do some analysis 
to determine the caching configuration the will result in lowest cost. Usually, 
being selective about which dataframes to cache results in a good balance 
between memory usage and CPU usage

I will not write data back to S3 and read it back in as a practice. 
Essentially, you are using S3 as a “cache”. However, reading and writing from 
S3 is not a scalable solution because it results in higher IO and IO doesn’t 
scale up as easily as CPU and Memory. The only time I would use S3 as a cache 
will be when by cached data is in terabyte+ range. If you are caching gigabytes 
of data, then you are better off caching in memory. This is 2018. Memory is 
cheap but limited.

From: Valery Khamenya 
Date: Tuesday, May 1, 2018 at 9:17 AM
To: "user@spark.apache.org" 
Subject: smarter way to "forget" DataFrame definition and stick to its values

hi all

a short example before the long story:

  var accumulatedDataFrame = ... // initialize

  for (i <- 1 to 100) {
val myTinyNewData = ... // my slowly calculated new data portion in tiny 
amounts
accumulatedDataFrame = accumulatedDataFrame.union(myTinyNewData)
// how  to stick here to the values of accumulatedDataFrame only and forget 
definitions?!
  }

this kind of stuff is likely to get slower and slower on each iteration even if 
myTinyNewData is quite compact. Usually I write accumulatedDataFrame to S3 and 
then re-load it back to clear the definition history. It makes code ugly 
though. Are there any smarter way?

It happens very often that a DataFrame is created via complex definitions. The 
DataFrame is then re-used in several places and sometimes it gets recalculated 
triggering a heavy cascade of operations.

Of course one could use .persist or .cache modifiers, but the result is 
unfortunately not transparent and instead of speeding up things it results in 
slow-down or even lost jobs if storage resources are not enough.

Any advice?

best regards
--
Valery


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Dataset Caching and Unpersisting

2018-05-02 Thread Daniele Foroni
Hi all,

I am having troubles with caching and unpersisting a dataset.
I have a cycle that at each iteration filters my dataset.
I realized that caching every x steps (e.g., 50 steps) gives good performance.

However, after a certain number of caching operations, it seems that the memory 
used for caching is filled, so I think I should have to unpersist the old 
cached dataset.

This is my code:


I tried to use an external variable to cache and unpersist it but it doesn’t 
seem to solve the problem (maybe I used it in the wrong way).
Do you kindly have any suggestion?

Thank you for your support!
---
Daniele



what is the query language used for graphX?

2018-05-02 Thread kant kodali
Hi All,

what is the query language used for graphX? are there any plans to
introduce gremlin or is that idea being dropped and go with Spark SQL?

Thanks!


spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Agostino Calamita
Hi all,
I wrote an application that needs an environment variable. I can set this
variable with

--conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd'

in spark-submit and it works well in standalone cluster mode.

But, I want set it inside the application code, because the variable
contains a password.

How can I do ?

I tried with:

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Solr ETL")
  .getOrCreate();


spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions",
"-Dbasicauth=myuser:mypassword");

but it doesn't work.

Thanks.


[Spark scheduling] Spark schedules single task although rdd has 48 partitions?

2018-05-02 Thread Paul Borgmans
(please notice this question was previously posted to 
https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions)
We are running Spark 2.3 / Python 3.5.2. For a job we run following code 
(please notice that the input txt files are just a simplified example, in-fact 
these are large binary files and sc.binaryFiles(...) runs out of memory loading 
the content into memory, therefor only the filenames are parallelized and the 
executors open/read the content):
files = [u'foo.txt', u'bar.txt', u'baz.txt', etc]  # len(files) == 155
def func(filename):
from app import generate_rows
return list(generate_rows(filename))

rdd = sc.parallelize(files, numSlices=48)
rdd2 = rdd.flatMap(func)
rdd3 = rdd2.map(lambda d: Row(**d))
df = spark.createDataFrame(rdd3)
df.write.mode(u'append').partitionBy(u'foo').parquet(output_path)

Where the app is a Python module (added to Spark using --py-files app.egg), 
simplified code is like this:
def generate_rows(filename):

yield OrderedDict([
(u'filename', filename),
(u'item1', u'item1'),
etc
])

We notice that the cluster is not utilized fully during the first stages which 
we don't understand, and we are looking for ways to control this behavior.
Job0 Stage0 1Task 1min paralellize
Job1 Stage1 1Task 2min paralellize
Job2 Stage2 1Task 1min paralellize
Job3 Stage3 48Tasks 5min 
paralellize|mappartitions|map|mappartitions|existingRDD|sort
What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks 
(as expected given the second parameter of parallelize set to 48)?

Excerpt from DEBUG logging:

18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 0
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
...
18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 0
18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool
18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 0
18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
...
18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 0
18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have 
all completed, from pool
18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 0
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
...
18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 0
18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have 
all completed, from pool
18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 0
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 48
18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 48
...
18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 1
18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 1
18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 0
18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have 
all completed, from pool

-- The information contained in this communication and any attachments is 
confidential and may be privileged, and is for the sole use of the intended 
recipient(s). Any unauthorized review, use, disclosure or distribution is 
prohibited. Unless explicitly stated otherwise in the body of this 
communication or the attachment thereto 

Re: [Spark Streaming]: Does DStream workload run over Spark SQL engine?

2018-05-02 Thread Saisai Shao
No, the underlying of DStream is RDD, so it will not leverage any SparkSQL
related feature. I think you should use Structured Streaming instead, which
is based on SparkSQL.

Khaled Zaouk  于2018年5月2日周三 下午4:51写道:

> Hi,
>
> I have a question regarding the execution engine of Spark Streaming
> (DStream API): Does Spark streaming jobs run over the Spark SQL engine?
>
> For example, if I change a configuration parameter related to Spark SQL
> (like spark.sql.streaming.minBatchesToRetain or
> spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this
> make any difference when I run Spark streaming job (using DStream API)?
>
> Thank you!
>
> Khaled
>


[Spark Streaming]: Does DStream workload run over Spark SQL engine?

2018-05-02 Thread Khaled Zaouk
Hi,

I have a question regarding the execution engine of Spark Streaming
(DStream API): Does Spark streaming jobs run over the Spark SQL engine?

For example, if I change a configuration parameter related to Spark SQL
(like spark.sql.streaming.minBatchesToRetain or
spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this
make any difference when I run Spark streaming job (using DStream API)?

Thank you!

Khaled