Re: Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread Dillon Dukek
There is documentation here
http://spark.apache.org/docs/latest/running-on-yarn.html about running
spark on YARN. Like I said before you can use either the logs from the
application or the Spark UI to understand how many executors are running at
any given time. I don't think I can help much further without more
information about the specific use case.


On Tue, Oct 9, 2018 at 2:54 PM Gourav Sengupta 
wrote:

> Hi Dillon,
>
> I do think that there is a setting available where in once YARN sets up
> the containers then you do not deallocate them, I had used it previously in
> HIVE, and it just saves processing time in terms of allocating containers.
> That said I am still trying to understand how do we determine one YARN
> container = one executor in SPARK.
>
> Regards,
> Gourav
>
> On Tue, Oct 9, 2018 at 9:04 PM Dillon Dukek
>  wrote:
>
>> I'm still not sure exactly what you are meaning by saying that you have 6
>> yarn containers. Yarn should just be aware of the total available resources
>> in  your cluster and then be able to launch containers based on the
>> executor requirements you set when you submit your job. If you can, I think
>> it would be helpful to send me the command you're using to launch your
>> spark process. You should also be able to use the logs and/or the spark UI
>> to determine how many executors are running.
>>
>> On Tue, Oct 9, 2018 at 12:57 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> hi,
>>>
>>> may be I am not quite clear in my head on this one. But how do we know
>>> that 1 yarn container = 1 executor?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Oct 9, 2018 at 8:53 PM Dillon Dukek
>>>  wrote:
>>>
 Can you send how you are launching your streaming process? Also what
 environment is this cluster running in (EMR, GCP, self managed, etc)?

 On Tue, Oct 9, 2018 at 10:21 AM kant kodali  wrote:

> Hi All,
>
> I am using Spark 2.3.1 and using YARN as a cluster manager.
>
> I currently got
>
> 1) 6 YARN containers(executors=6) with 4 executor cores for each
> container.
> 2) 6 Kafka partitions from one topic.
> 3) You can assume every other configuration is set to whatever the
> default values are.
>
> Spawned a Simple Streaming Query and I see all the tasks get scheduled
> on one YARN container. am I missing any config?
>
> Thanks!
>



Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Jörn Franke
Generally please avoid System.out.println, but use a logger -even for examples. 
People may take these examples from here and put it in their production code.

> Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia :
> 
> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and 
> still it is creating multiple instances. 
> 
> Here is my project:
> 
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │   └── shubham
> │   │   │   ├── MyDataSource.java
> │   │   │   └── reader
> │   │   │   └── MyDataSourceReader.java
> 
> 
> MyDataSource.java
> -
> package com.shubham;
> 
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
> 
> import java.util.Optional;
> 
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
> 
>   public DataSourceReader createReader(DataSourceOptions options) {
> System.out.println("MyDataSource.createReader: Going to create a new 
> MyDataSourceReader");
> return new MyDataSourceReader(options.asMap());
>   }
> 
>   public Optional createWriter(String writeUUID, StructType 
> schema, SaveMode mode, DataSourceOptions options) {
> return Optional.empty();
>   }
> }
> 
> MyDataSourceReader.java
> -
> package com.shubham.reader;
> 
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
> 
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> 
> public class MyDataSourceReader implements DataSourceReader, 
> SupportsScanColumnarBatch {
> 
>   private Map options;
>   private StructType schema;
> 
>   public MyDataSourceReader(Map options) {
> System.out.println("MyDataSourceReader.MyDataSourceReader: 
> Instantiated" + this);
> this.options = options;
>   }
> 
>   @Override
>   public StructType readSchema() {
> this.schema = (new StructType())
> .add("col1", "int")
> .add("col2", "string");
> System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " 
> + this.schema);
> return this.schema;
>   }
> 
>   @Override
>   public List> planBatchInputPartitions() {
> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this 
> + " schema: " + this.schema);
> return new ArrayList<>();
>   }
> }
> 
> 
> spark-shell output
> 
> scala> spark.read.format("com.shubham.MyDataSource").option("query", "select 
> * from some_table").load.show
> 
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: 
> Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 
> schema: StructType(StructField(col1,IntegerType,true), 
> StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: 
> Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions: 
> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
> +++
> |col1|col2|
> +++
> +++
> 
> 
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and 
> MyDataSourceReader@3095c449 are being created. Consequently schema is null in 
> MyDataSourceReader@3095c449.
> 
> Am I not doing it the correct way?
> 
> Thanks,
> Shubham
> 
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf  
>> wrote:
>> I am using v2.4.0-RC2
>> 
>>  
>> 
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). 
>> How are you calling it?
>> 
>>  
>> 
>> When I do:
>> 
>> Val df = spark.read.format(mypackage).load().show()
>> 
>> I am getting a single creation, how are you creating the reader?
>> 
>>  
>> 
>> Thanks,
>> 
>> Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] 
>> Sent: Tuesday, October 9, 2018 2:02 PM
>> To: Mendelson, Assaf; user@spark.apache.org
>> Subject: Re: DataSourceV2 APIs creating multiple instances of 
>> DataSourceReader and hence not 

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Hyukjin Kwon
I took a look for the codes.

val source = classOf[MyDataSource].getCanonicalName
spark.read.format(source).load().collect()

Looks indeed it calls twice.

First all: Looks it creates it first to read the schema for a logical plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

Second call: it creates another for its actual partitions in a physcal plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.Iterator$class.foreach(Iterator.scala:891)
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)


Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found the
guarantee that the readers are created only once. If that’s documented
somewhere, we should fix it in 2.4.0. If not, I think it fine since both
calls are in driver side and it’s something able to work around for
instance static class or thread local in this case.

Forwarding to dev mailing list in case that this is something we haven't
foreseen.

2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia 님이
작성:

> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and
> still it is creating multiple instances.
>
> Here is my project:
>
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │   └── shubham
> │   │   │   ├── MyDataSource.java
> │   │   │   └── reader
> │   │   │   └── MyDataSourceReader.java
>
>
> MyDataSource.java
> -
>
> package com.shubham;
>
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
>
> import java.util.Optional;
>
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
>
>   

PySpark Streaming : Accessing the Remote Secured Kafka

2018-10-09 Thread Ramaswamy, Muthuraman
All,

Currently, I am using PySpark Streaming (Classic Regular DStream Style and not 
Structured Streaming). Now, our remote Kafka is secured with Kerberos.

To enable PySpark Streaming to access the secured Kafka, what steps I should 
perform? Can I pass the principal/keytab and jaas config in the Spark Submit 
command? What are my options? Any pointers/links to access the secured Kafka 
broker from PySpark Streaming will be helpful.

Appreciate your time.

Thank you,

~Muthu




Re: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Nirav Patel
Thanks Shuporno . That mode worked. I found out couple records having
quotes inside quotes which needed to be escaped.



On Tue, Oct 9, 2018 at 1:40 PM Taylor Cox  wrote:

> Hey Nirav,
>
>
>
> Here’s an idea:
>
>
>
> Suppose your file.csv has N records, one for each line. Read the csv
> line-by-line (without spark) and attempt to parse each line. If a record is
> malformed, catch the exception and rethrow it with the line number. That
> should show you where the problematic record(s) can be found.
>
>
>
> *From:* Nirav Patel 
> *Sent:* Monday, October 8, 2018 11:57 AM
> *To:* spark users 
> *Subject:* CSV parser - is there a way to find malformed csv record
>
>
>
> I am getting `RuntimeException: Malformed CSV record` while parsing csv
> record and attaching schema at same time. Most likely there are additional
> commas or json data in some field which are not escaped properly. Is there
> a way CSV parser tells me which record is malformed?
>
>
>
>
>
> This is what I am using:
>
>
>
> val df2 = sparkSession.read
>
>   .option("inferSchema", true)
>
>   .option("multiLine", true)
>
>   .schema(headerDF.schema) // this only works without column mismatch
>
>   .csv(dataPath)
>
>
>
> Thanks
>
>
>
>
> [image: Image removed by sender. What's New with Xactly]
> 
>
> [image: Image removed by sender.]
> 
>   [image: Image removed by sender.]
> 
>   [image: Image removed by sender.]
> 
>   [image: Image removed by sender.]
> 
>   [image: Image removed by sender.]
> 
>

-- 


 

 
   
   
      



Re: [K8S] Option to keep the executor pods after job finishes

2018-10-09 Thread Yinan Li
There is currently no such an option. But this has been raised before in
https://issues.apache.org/jira/browse/SPARK-25515.


On Tue, Oct 9, 2018 at 2:17 PM Li Gao  wrote:

> Hi,
>
> Is there an option to keep the executor pods on k8s after the job
> finishes? We want to extract the logs and stats before removing the
> executor pods.
>
> Thanks,
> Li
>


Re: Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread Gourav Sengupta
Hi Dillon,

I do think that there is a setting available where in once YARN sets up the
containers then you do not deallocate them, I had used it previously in
HIVE, and it just saves processing time in terms of allocating containers.
That said I am still trying to understand how do we determine one YARN
container = one executor in SPARK.

Regards,
Gourav

On Tue, Oct 9, 2018 at 9:04 PM Dillon Dukek 
wrote:

> I'm still not sure exactly what you are meaning by saying that you have 6
> yarn containers. Yarn should just be aware of the total available resources
> in  your cluster and then be able to launch containers based on the
> executor requirements you set when you submit your job. If you can, I think
> it would be helpful to send me the command you're using to launch your
> spark process. You should also be able to use the logs and/or the spark UI
> to determine how many executors are running.
>
> On Tue, Oct 9, 2018 at 12:57 PM Gourav Sengupta 
> wrote:
>
>> hi,
>>
>> may be I am not quite clear in my head on this one. But how do we know
>> that 1 yarn container = 1 executor?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Oct 9, 2018 at 8:53 PM Dillon Dukek
>>  wrote:
>>
>>> Can you send how you are launching your streaming process? Also what
>>> environment is this cluster running in (EMR, GCP, self managed, etc)?
>>>
>>> On Tue, Oct 9, 2018 at 10:21 AM kant kodali  wrote:
>>>
 Hi All,

 I am using Spark 2.3.1 and using YARN as a cluster manager.

 I currently got

 1) 6 YARN containers(executors=6) with 4 executor cores for each
 container.
 2) 6 Kafka partitions from one topic.
 3) You can assume every other configuration is set to whatever the
 default values are.

 Spawned a Simple Streaming Query and I see all the tasks get scheduled
 on one YARN container. am I missing any config?

 Thanks!

>>>


[K8S] Option to keep the executor pods after job finishes

2018-10-09 Thread Li Gao
Hi,

Is there an option to keep the executor pods on k8s after the job finishes?
We want to extract the logs and stats before removing the executor pods.

Thanks,
Li


RE: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Taylor Cox
Hey Nirav,

Here’s an idea:

Suppose your file.csv has N records, one for each line. Read the csv 
line-by-line (without spark) and attempt to parse each line. If a record is 
malformed, catch the exception and rethrow it with the line number. That should 
show you where the problematic record(s) can be found.

From: Nirav Patel 
Sent: Monday, October 8, 2018 11:57 AM
To: spark users 
Subject: CSV parser - is there a way to find malformed csv record

I am getting `RuntimeException: Malformed CSV record` while parsing csv record 
and attaching schema at same time. Most likely there are additional commas or 
json data in some field which are not escaped properly. Is there a way CSV 
parser tells me which record is malformed?


This is what I am using:

val df2 = sparkSession.read
  .option("inferSchema", true)
  .option("multiLine", true)
  .schema(headerDF.schema) // this only works without column mismatch
  .csv(dataPath)

Thanks



[Image removed by sender. What's New with 
Xactly]

[Image removed by 
sender.]
  [Image removed by sender.] 

   [Image removed by sender.] 

   [Image removed by sender.] 

   [Image removed by sender.] 



Re: Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread Dillon Dukek
I'm still not sure exactly what you are meaning by saying that you have 6
yarn containers. Yarn should just be aware of the total available resources
in  your cluster and then be able to launch containers based on the
executor requirements you set when you submit your job. If you can, I think
it would be helpful to send me the command you're using to launch your
spark process. You should also be able to use the logs and/or the spark UI
to determine how many executors are running.

On Tue, Oct 9, 2018 at 12:57 PM Gourav Sengupta 
wrote:

> hi,
>
> may be I am not quite clear in my head on this one. But how do we know
> that 1 yarn container = 1 executor?
>
> Regards,
> Gourav Sengupta
>
> On Tue, Oct 9, 2018 at 8:53 PM Dillon Dukek
>  wrote:
>
>> Can you send how you are launching your streaming process? Also what
>> environment is this cluster running in (EMR, GCP, self managed, etc)?
>>
>> On Tue, Oct 9, 2018 at 10:21 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.3.1 and using YARN as a cluster manager.
>>>
>>> I currently got
>>>
>>> 1) 6 YARN containers(executors=6) with 4 executor cores for each
>>> container.
>>> 2) 6 Kafka partitions from one topic.
>>> 3) You can assume every other configuration is set to whatever the
>>> default values are.
>>>
>>> Spawned a Simple Streaming Query and I see all the tasks get scheduled
>>> on one YARN container. am I missing any config?
>>>
>>> Thanks!
>>>
>>


Re: Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread Gourav Sengupta
hi,

may be I am not quite clear in my head on this one. But how do we know that
1 yarn container = 1 executor?

Regards,
Gourav Sengupta

On Tue, Oct 9, 2018 at 8:53 PM Dillon Dukek 
wrote:

> Can you send how you are launching your streaming process? Also what
> environment is this cluster running in (EMR, GCP, self managed, etc)?
>
> On Tue, Oct 9, 2018 at 10:21 AM kant kodali  wrote:
>
>> Hi All,
>>
>> I am using Spark 2.3.1 and using YARN as a cluster manager.
>>
>> I currently got
>>
>> 1) 6 YARN containers(executors=6) with 4 executor cores for each
>> container.
>> 2) 6 Kafka partitions from one topic.
>> 3) You can assume every other configuration is set to whatever the
>> default values are.
>>
>> Spawned a Simple Streaming Query and I see all the tasks get scheduled on
>> one YARN container. am I missing any config?
>>
>> Thanks!
>>
>


Re: Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread Dillon Dukek
Can you send how you are launching your streaming process? Also what
environment is this cluster running in (EMR, GCP, self managed, etc)?

On Tue, Oct 9, 2018 at 10:21 AM kant kodali  wrote:

> Hi All,
>
> I am using Spark 2.3.1 and using YARN as a cluster manager.
>
> I currently got
>
> 1) 6 YARN containers(executors=6) with 4 executor cores for each
> container.
> 2) 6 Kafka partitions from one topic.
> 3) You can assume every other configuration is set to whatever the default
> values are.
>
> Spawned a Simple Streaming Query and I see all the tasks get scheduled on
> one YARN container. am I missing any config?
>
> Thanks!
>


Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on wo

2018-10-09 Thread zakhavan
Hello,

I'm trying to calculate the Pearson correlation between two DStreams using
sliding window in Pyspark. But I keep getting the following error:

Traceback (most recent call last):
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py",
line 63, in 
result = Statistics.corr(windowedds1,windowedds2, method="pearson")
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py",
line 157, in corr
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 130, in callMLlibFunc
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 122, in callJavaFunc
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 87, in _py2java
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
line 555, in dumps
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py",
line 315, in __getnewargs__
Exception: It appears that you are attempting to reference SparkContext from
a broadcast variable, action, or transformation. SparkContext can only be
used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

The error comes from these this line:

result = Statistics.corr(windowedds1,windowedds2, method="pearson")

First, I read the lines from 2 text files and load them into two Kafka
topics and then apply the window operation on each DStream and calculate
Pearson correlation between them.

Here is my code:

from __future__ import print_function
from future.builtins import *
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import time
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy import read

if __name__ == "__main__":
print("hello spark")

sc = SparkContext("local[2]", appName="CrossCorrelation")
ssc = StreamingContext(sc, 5)
broker, topic1, topic2 = sys.argv[1:]
# Connect to Kafka

kvs1 = KafkaUtils.createStream(ssc, broker,
"real-time-cross-correlation",{topic1:1})
kvs2 = KafkaUtils.createStream(ssc, broker,
"real-time-cross-correlation",{topic2:1})
lines1 = kvs1.map(lambda x1: x1[1])
ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda
strelem1: float(strelem1))
lines2 = kvs2.map(lambda x2: x2[1])
ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda
strelem2: float(strelem2))
#Windowing
windowedds1= ds1.window(10,5)
windowedds2= ds2.window(10,5)
#Correlation
result = Statistics.corr(windowedds1,windowedds2, method="pearson")
if result > 0.7:
print("ds1 and ds2 are correlated!!!")

ssc.start()
ssc.awaitTermination()

Does anybody know what I'm doing wrong?

Thank you.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Does spark.streaming.concurrentJobs still exist?

2018-10-09 Thread kant kodali
Does spark.streaming.concurrentJobs still exist?

spark.streaming.concurrentJobs (default: 1) is the number of concurrent
jobs, i.e. threads in streaming-job-executor thread pool

.

Also how is this definition different from executor-cores?


Re: Any way to see the size of the broadcast variable?

2018-10-09 Thread V0lleyBallJunki3
Yes each of the executors have 60GB



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark on YARN not utilizing all the YARN containers available

2018-10-09 Thread kant kodali
Hi All,

I am using Spark 2.3.1 and using YARN as a cluster manager.

I currently got

1) 6 YARN containers(executors=6) with 4 executor cores for each container.
2) 6 Kafka partitions from one topic.
3) You can assume every other configuration is set to whatever the default
values are.

Spawned a Simple Streaming Query and I see all the tasks get scheduled on
one YARN container. am I missing any config?

Thanks!


Re: Any way to see the size of the broadcast variable?

2018-10-09 Thread Gourav Sengupta
Hi Venkat,

do you executors have that much amount of memory?

Regards,
Gourav Sengupta

On Tue, Oct 9, 2018 at 4:44 PM V0lleyBallJunki3 
wrote:

> Hello,
>I have set the value of spark.sql.autoBroadcastJoinThreshold to a very
> high value of 20 GB. I am joining a table that I am sure is below this
> variable, however spark is doing a SortMergeJoin. If I set a broadcast hint
> then spark does a broadcast join and job finishes much faster. However,
> when
> run in production for some large tables, I run into errors. Is there a way
> to see the actual size of the table being broadcast? I wrote the table
> being
> broadcast to disk and it took only 32 MB in parquet. I tried to cache this
> table in Zeppelin and run a table.count() operation but nothing gets shown
> on on the Storage tab of the Spark History Server. spark.util.SizeEstimator
> doesn't seem to be giving accurate numbers for this table either. Any way
> to
> figure out the size of this table being broadcast?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Any way to see the size of the broadcast variable?

2018-10-09 Thread V0lleyBallJunki3
Hello,
   I have set the value of spark.sql.autoBroadcastJoinThreshold to a very
high value of 20 GB. I am joining a table that I am sure is below this
variable, however spark is doing a SortMergeJoin. If I set a broadcast hint
then spark does a broadcast join and job finishes much faster. However, when
run in production for some large tables, I run into errors. Is there a way
to see the actual size of the table being broadcast? I wrote the table being
broadcast to disk and it took only 32 MB in parquet. I tried to cache this
table in Zeppelin and run a table.count() operation but nothing gets shown
on on the Storage tab of the Spark History Server. spark.util.SizeEstimator
doesn't seem to be giving accurate numbers for this table either. Any way to
figure out the size of this table being broadcast?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Internal Spark class is not registered by Kryo

2018-10-09 Thread 曹礼俊
Hi all:

I have set spark.kryo.registrationRequired=true, but an exception occured:
java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I
run the program.

I tried to register it manually by kryo.register() and
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the
same problem I think.

Be grateful if someone can help me..

Best Regards,
Lijun Cao


Re: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Shuporno Choudhury
Hi,
There is a way to way obtain these malformed/rejected records. Rejection
can happen not only because of column number mismatch but also if the data
type of the data does not match the data type mentioned in the schema.
To obtain the rejected records, you can do the following:
1. Add an extra column (eg: CorruptRecCol) to your schema of type
StringType()
2. In the datadrame reader, add the *mode* 'PERMISSIVE' while
simultaneously adding the column CorruptRecCol to
*columnNameOfCorruptRecord*
3. The column CorruptRecCol will contain the complete record if it is
malformed/corrupted. On the other hand, it will be null if the record is
valid. So you can use a filter (CorruptRecCol is NULL) to obtain the
malformed/corrupted record.
You can use any column name to contain the invalid records. I have used
*CorruptRecCol* just for example.
http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
This example is for pyspark. Similar example will exist for Java/Scala also.
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/DataFrameReader.html


On Tue, 9 Oct 2018 at 00:27, Nirav Patel [via Apache Spark User List] <
ml+s1001560n33643...@n3.nabble.com> wrote:

> I am getting `RuntimeException: Malformed CSV record` while parsing csv
> record and attaching schema at same time. Most likely there are additional
> commas or json data in some field which are not escaped properly. Is there
> a way CSV parser tells me which record is malformed?
>
>
> This is what I am using:
>
> val df2 = sparkSession.read
>   .option("inferSchema", true)
>   .option("multiLine", true)
>   .schema(headerDF.schema) // this only works without column mismatch
>   .csv(dataPath)
>
> Thanks
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/CSV-parser-is-there-a-way-to-find-malformed-csv-record-tp33643.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
--Thanks,
Shuporno Choudhury


Internal Spark class is not registered by Kryo

2018-10-09 Thread 曹礼俊
Hi all:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and
still it is creating multiple instances.

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── shubham
│   │   │   ├── MyDataSource.java
│   │   │   └── reader
│   │   │   └── MyDataSourceReader.java


MyDataSource.java
-

package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

  public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a
new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
  }

  public Optional createWriter(String writeUUID,
StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
  }
}


MyDataSourceReader.java
-

package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  private Map options;
  private StructType schema;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public StructType readSchema() {
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }

  @Override
  public List> planBatchInputPartitions() {
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
return new ArrayList<>();
  }
}



spark-shell output

scala> spark.read.format("com.shubham.MyDataSource").option("query",
"select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema:
com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions:
com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+++
|col1|col2|
+++
+++


Here 2 instances of reader, MyDataSourceReader@69fa5536 and
MyDataSourceReader@3095c449 are being created. Consequently schema is null
in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf 
wrote:

> I am using v2.4.0-RC2
>
>
>
> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
> How are you calling it?
>
>
>
> When I do:
>
> Val df = spark.read.format(mypackage).load().show()
>
> I am getting a single creation, how are you creating the reader?
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 2:02 PM
> *To:* Mendelson, Assaf; user@spark.apache.org
> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>
>
>
> Full Code:
>
>
>
> MyDataSource is the entry point which simply creates Reader and Writer
>
>
>
> public class MyDataSource implements DataSourceV2, WriteSupport,
> ReadSupport, SessionConfigSupport {
>
>
>
>   @Override public DataSourceReader createReader(DataSourceOptions
> options) {
>
> return new 

Internal Spark class is not registered by Kryo

2018-10-09 Thread BOT
Hi developers:


I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 


I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.


The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 


Be grateful if someone can help me..


Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread BOT
Hi developers:
I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 
I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.
The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 
Be grateful if someone can help me..


Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

Spark internal class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Mendelson, Assaf
I am using v2.4.0-RC2

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How 
are you calling it?

When I do:
Val df = spark.read.format(mypackage).load().show()
I am getting a single creation, how are you creating the reader?

Thanks,
Assaf

From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; user@spark.apache.org
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader 
and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Thanks Assaf, you tried with tags/v2.4.0-rc2?

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, 
SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options) {
return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional createWriter(String jobId, StructType 
schema,
  SaveMode mode, DataSourceOptions options) {
// creates a dataSourcewriter here..
return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
mailto:assaf.mendel...@rsa.com>> wrote:
Could you add a fuller code example? I tried to reproduce it in my environment 
and I am getting just one instance of the reader…

Thanks,
Assaf

From: Shubham Chaurasia 
[mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and 
hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a 
different instance of MyDataSourceReader and hence I am not getting the value 
of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in 
planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@59ea8f1b

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
StructType(StructField(col1,IntegerType,true), 
StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@a3cd3ff

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: 
null

Thanks,
Shubham




Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Thanks Assaf, you tried with *tags/v2.4.0-rc2?*

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport,
ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options)
{
return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional createWriter(String jobId, StructType
schema,
  SaveMode mode, DataSourceOptions options) {
// creates a dataSourcewriter here..
return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema:
" + this.schema);
return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
wrote:

> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, 
> SupportsScanColumnarBatch {
>
>   StructType *schema *= *null*;
>   Map *options*;
>
>   *public *MyDataSourceReader(Map options) {
> System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: 
> Instantiated" *+ *this*);
> *this*.*options *= options;
>   }
>
>   @Override
>   *public *List> planBatchInputPartitions() {
>
> *//variable this.schema is null here since readSchema() was called on a 
> different instance
> *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ 
> *this *+ *" schema: " *+ *this*.*schema*);
>
> *//more logic..**return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema**this*.*schema *= (*new *StructType())
> .add(*"col1"*, *"int"*)
> .add(*"col2"*, *"string"*);
> System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" 
> schema: " *+ *this*.*schema*);
> *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
> class variable schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a 
> different instance of MyDataSourceReader and hence I am not getting the value 
> of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in 
> planBatchInputPartitions() method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
> StructType(StructField(col1,IntegerType,true), 
> StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff 
> schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>


RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Mendelson, Assaf
Could you add a fuller code example? I tried to reproduce it in my environment 
and I am getting just one instance of the reader…

Thanks,
Assaf

From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and 
hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a 
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a 
different instance of MyDataSourceReader and hence I am not getting the value 
of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in 
planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@59ea8f1b

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
StructType(StructField(col1,IntegerType,true), 
StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: 
InstantiatedMyDataSourceReader@a3cd3ff

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: 
null

Thanks,
Shubham




DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Hi All,

--Spark built with *tags/v2.4.0-rc2*

Consider following DataSourceReader implementation:

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called
on a different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which
sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called
on a different instance of MyDataSourceReader and hence I am not
getting the value of schema in method planBatchInputPartitions().

How can I get value of schema which was set in readSchema() method, in
planBatchInputPartitions() method?

Console Logs:

scala> mysource.executeQuery("select * from movie").show

MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@59ea8f1b
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@a3cd3ff
MyDataSourceReader.planBatchInputPartitions:
MyDataSourceReader@a3cd3ff schema: null


Thanks,
Shubham


SparkR issue

2018-10-09 Thread ayan guha
Hi

We are seeing some weird behaviour in Spark R.

We created a R Dataframe with 600K records and 29 columns. Then we tried to
convert R DF to SparkDF using

df <- SparkR::createDataFrame(rdf)

from RStudio. It hanged, we had to kill the process after 1-2 hours.

We also tried following:
df <- SparkR::createDataFrame(rdf, numPartition=4000)
df <- SparkR::createDataFrame(rdf, numPartition=300)
df <- SparkR::createDataFrame(rdf, numPartition=10)

Same result. Both scenarios seems RStudio is working and no trace of jobs
in Spark Application Master view.

Finally, we used this:

df <- SparkR::createDataFrame(rdf, schema=schema) , schema is a StructType.

This tool 25 mins to create the spark DF. However job did show up in
Application Master view and it shows only 20-30 secs. Then where did rest
of the time go?

Question:
1. Is this expected behavior? (I hope not). How should we speed up this bit?
2. We understand better options would be to read data from external
sources, but we need this data to be generated for some simulation purpose.
Whats possibly going wrong?


Best
Ayan



-- 
Best Regards,
Ayan Guha