Re: Table created with saveAsTable behaves differently than a table created with spark.sql("CREATE TABLE....)

2023-01-21 Thread Peyman Mohajerian
In the case of saveAsTable("tablename") you specified the partition: '
partitionBy("partitionCol")'

On Sat, Jan 21, 2023 at 4:03 AM krexos 
wrote:

> My periodically running process writes data to a table over parquet files
> with the configuration "spark.sql.sources.partitionOverwriteMode" =
> "dynamic" with the following code:
>
> if (!tableExists) {
>   df.write
> .mode("overwrite")
> .partitionBy("partitionCol")
> .format("parquet")
> .saveAsTable("tablename")
> }else {
>   df.write
> .format("parquet")
> .mode("overwrite")
> .insertInto("table")
> }
>
> If the table doesn't exist and is created in the first clause, it works
> fine and on the next run when the table does exist and the else clause runs
> it works as expected.
>
> However, when I create the table over existing parquet files either
> through a hive session or using spark.sql("CREATE TABLE...") and then run
> the process it fails to write with the error:
>
> "org.apache.spark.SparkException: Dynamic partition strict mode requires
> at least one static partition column. To turn this off set
> hive.exec.dynamic.partition.mode=nonstrict"
> Adding this configuration to the spark conf solves the issue but I don't
> understand why it is needed when creating the table through a command but
> isn't needed when creating the table with saveAsTable.
>
> Also, I don't understand how this configuration is relevant for spark. From
> what I've read
> ,
> static partition here means we directly specify the partition to write into
> instead of specifying the column to partition by. Is it even possible to do
> such an insert in spark (as opposed to HiveQL)?
>
> Spark 2.4, Hadoop 3.1
>
>
> thanks
>


Re: How the data is distributed

2022-06-06 Thread Peyman Mohajerian
Later.

On Mon, Jun 6, 2022 at 2:07 PM Sid  wrote:

> Hi experts,
>
>
> When we load any file, I know that based on the information in the spark
> session about the executors location, status and etc , the data is
> distributed among the worker nodes and executors.
>
> But I have one doubt. Is the data initially loaded on the driver and then
> it is distributed or it is directly distributed amongst the workers?
>
> Thanks,
> Sid
>


Re: Consuming from Kafka to delta table - stream or batch mode?

2022-02-24 Thread Peyman Mohajerian
If you want to batch consume from Kafka, trigger-once config would work
with structured streaming and you get the benefit of the checkpointing.

On Thu, Feb 24, 2022 at 6:07 AM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> Hello,
>
>
>
> Our team is working with Spark (for the first time) and one of the sources
> we need to consume is Kafka (multiple topics).  Are there any practical or
> operational issues to be aware of when deciding whether to a) consume in
> batches until all messages are consumed then shut down the spark job, then
> when new messages show up, start a new job; or b) use spark streaming and
> run the job continuously?  If it makes a difference, the environment is
> on-premise spark on k8s.
>
>
>
> Any experience shared is appreciated.
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: question for definition of column types

2022-01-26 Thread Peyman Mohajerian
from pyspark.sql.types import *

list =[("buck trends", "ceo", 20.00, 0.25, "100")]

schema = StructType([ StructField("name", StringType(), True),
  StructField("title", StringType(), True),
  StructField("salary", DoubleType(), True),
  StructField("rate", DoubleType(), True),
  StructField("insurance", StringType(), True)
])

df= spark.createDataFrame(data=list, schema=schema)

On Wed, Jan 26, 2022 at 6:49 PM  wrote:

> when creating dataframe from a list, how can I specify the col type?
>
> such as:
>
> >>> df =
> >>>
> spark.createDataFrame(list,["name","title","salary","rate","insurance"])
> >>> df.show()
> +---+-+--++-+
> |   name|title|salary|rate|insurance|
> +---+-+--++-+
> |buck trends|  ceo|20|0.25|  100|
> |cindy banks|  cfo|17|0.22|  120|
> |  joe coder|developer|13| 0.2|  120|
> +---+-+--++-+
>
>
> >>> df.describe()
> DataFrame[summary: string, name: string, title: string, salary: string,
> rate: string, insurance: string]
>
> I want the salary, rate, insurance to be Double type, not a String type.
>
> Thank you.
> Frakass
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: DropNa in Spark for Columns

2021-02-27 Thread Peyman Mohajerian
I don't have personal experience with Koalas but it does seem to have the
same api:
https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.dropna.html

On Fri, Feb 26, 2021 at 11:46 PM Vitali Lupusor 
wrote:

> Hello Chetan,
>
> I don’t know about Scala, but in PySpark there is no elegant way of
> dropping NAs on column axis.
>
> Here is a possible solution to your problem:
>
> >>> data = [(None, 1,  2), (0, None, 2), (0, 1, 2)]
> >>> columns = ('A', 'B', 'C')
> >>> data = [(None, 1,  2), (0, None, 2), (0, 1, 2)]
> >>> df = spark.createDataFrame(data, columns)
> >>> df.show()
> +++---+
> |   A|   B|  C|
> +++---+
> |null|   1|  2|
> |   0|null|  2|
> |   0|   1|  2|
> +++---+
> >>> for column in df.columns:
> if df.select(column).where(df[column].isNull()).first():
> df = df.drop(column)
> ...
> >>> df.show()
> +---+
> |  C|
> +---+
> |  2|
> |  2|
> |  2|
> +—+
>
> If your dataframe doesn’t exceed the size of your memory, I suggest you
> bring it into Pandas.
>
> >>> df_pd = df.toPandas()
> >>> df_pd
>  AB  C
> 0  NaN  1.0  2
> 1  0.0  NaN  2
> 2  0.0  1.0  2
> >>> df_pd = df_pd.dropna(axis='column’)
> >>> df_pd
>C
> 0  2
> 1  2
> 2  2
>
> Which you then can bring back into Spark:
>
> >>> df = spark.createDataFrame(df_pd)
> >>> df.show()
> +---+
> |  C|
> +---+
> |  2|
> |  2|
> |  2|
> +---+
>
> Hope that help.
>
> Regards,
> V
>
> On 27 Feb 2021, at 05:25, Chetan Khatri 
> wrote:
>
> Hi Users,
>
> What is equivalent of *df.dropna(axis='columns'**) *of Pandas in the
> Spark/Scala?
>
> Thanks
>
>
>


Re: Question on bucketing vs sorting

2020-12-31 Thread Peyman Mohajerian
So there's the hive partitions, that's at rest partitioning, vs Spark
partitioning, make sure you're not confusing the two. If the cardinality of
the column you want to bucket by isn't too high and you don't have data
skewness with respect to the buckets then you should use it (and each
partition has at least 256M-1G of data) also you're total data size is
large enough, not a few MBs. Each bucket doesn't translate to one Spark
partition necessarily, if you have a couple of Gigs per bucket (at rest),
Spark will create many partitions per bucket. So whether you sort or bucket
the same value won't be in the same partition necessarily unless you force
it by having a specific number of partitions (same as you number of
buckets), which may not be a good idea if you have too much data per bucket.


On Thu, Dec 31, 2020 at 10:21 AM Patrik Iselind 
wrote:

> Thank you Peyman for clarifying this for me.
> Would you say there's a case for using bucketing in this case at all, or
> should I simply focus completely on the sorting solution? If so, when would
> you say bucketing is the preferred solution?
>
> Patrik Iselind
>
>
> On Thu, Dec 31, 2020 at 4:15 PM Peyman Mohajerian 
> wrote:
>
>> You can save your data to hdfs or other targets using either a sorted or
>> bucketed dataframe. In the case of bucketing you will have a different data
>> skipping mechanism when you read back the data compared to the sorted
>> version.
>>
>> On Thu, Dec 31, 2020 at 5:40 AM Patrik Iselind 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I am trying to push by understanding of bucketing vs sorting. I hope I
>>> can get som clarification from this list.
>>>
>>> Bucketing as I've come to understand it is primarily intended for when
>>> preparing the dataframe for join operations. Where the goal is to get data
>>> that will be joined together in the same partition, to make the joins
>>> faster.
>>>
>>> Sorting on the other hand is simply for when I want my data sorted,
>>> nothing strange there I guess.
>>>
>>> The effect of using bucketing, as I see it, would be the same as sorting
>>> if I'm not doing any joining and I use enough buckets, like in the
>>> following example program. Where the sorting or bucketing would replace the
>>> `?()` transformation.
>>>
>>> ```pseudo code
>>> df = spark.read.parquet("s3://...")
>>> // df contains the columns A, B, and C
>>> df2 = df.distinct().?().repartition(num_desired_partitions)
>>> df2.write.parquet("s3://,,,")
>>> ```
>>>
>>> Is my understanding correct or am I missing something?
>>>
>>> Is there a performance consideration between sorting and bucketing that
>>> I need to keep in mind?
>>>
>>> The end goal for me here is not that the data as such is sorted on the A
>>> column, it's that each  distinct value of A is kept together with all other
>>> rows which have the same value in A. If all rows with the same A value
>>> cannot fit within one partitions, then I accept that there's more than one
>>> partitions with the same value in the A column. If there's room left in the
>>> partitions, then it would be fine for rows with another value of A to fill
>>> up the partition.
>>>
>>> I would like something as depicted below
>>> ```desireable example
>>> -- Partition 1
>>> A|B|C
>>> =
>>> 2|?|?
>>> 2|?|?
>>> 2|?|?
>>> 2|?|?
>>> 2|?|?
>>> 2|?|?
>>> 2|?|?
>>> -- Partition 2
>>> A|B|C
>>> =
>>> 2|?|?
>>> 0|?|?
>>> 0|?|?
>>> 0|?|?
>>> 1|?|?
>>> ```
>>>
>>> What I don't want is something like below
>>>
>>> ```undesireable example
>>> -- Partition 1
>>> A|B|C
>>> =
>>> 0|?|?
>>> 0|?|?
>>> 1|?|?
>>> 0|?|?
>>> 1|?|?
>>> 2|?|?
>>> 1|?|?
>>> -- Partition 2
>>> A|B|C
>>> =
>>> 0|?|?
>>> 0|?|?
>>> 0|?|?
>>> 1|?|?
>>> 2|?|?
>>> ```
>>> Where the A value varies.
>>>
>>> Patrik Iselind
>>>
>>


Re: Question on bucketing vs sorting

2020-12-31 Thread Peyman Mohajerian
You can save your data to hdfs or other targets using either a sorted or
bucketed dataframe. In the case of bucketing you will have a different data
skipping mechanism when you read back the data compared to the sorted
version.

On Thu, Dec 31, 2020 at 5:40 AM Patrik Iselind  wrote:

> Hi everyone,
>
> I am trying to push by understanding of bucketing vs sorting. I hope I can
> get som clarification from this list.
>
> Bucketing as I've come to understand it is primarily intended for when
> preparing the dataframe for join operations. Where the goal is to get data
> that will be joined together in the same partition, to make the joins
> faster.
>
> Sorting on the other hand is simply for when I want my data sorted,
> nothing strange there I guess.
>
> The effect of using bucketing, as I see it, would be the same as sorting
> if I'm not doing any joining and I use enough buckets, like in the
> following example program. Where the sorting or bucketing would replace the
> `?()` transformation.
>
> ```pseudo code
> df = spark.read.parquet("s3://...")
> // df contains the columns A, B, and C
> df2 = df.distinct().?().repartition(num_desired_partitions)
> df2.write.parquet("s3://,,,")
> ```
>
> Is my understanding correct or am I missing something?
>
> Is there a performance consideration between sorting and bucketing that I
> need to keep in mind?
>
> The end goal for me here is not that the data as such is sorted on the A
> column, it's that each  distinct value of A is kept together with all other
> rows which have the same value in A. If all rows with the same A value
> cannot fit within one partitions, then I accept that there's more than one
> partitions with the same value in the A column. If there's room left in the
> partitions, then it would be fine for rows with another value of A to fill
> up the partition.
>
> I would like something as depicted below
> ```desireable example
> -- Partition 1
> A|B|C
> =
> 2|?|?
> 2|?|?
> 2|?|?
> 2|?|?
> 2|?|?
> 2|?|?
> 2|?|?
> -- Partition 2
> A|B|C
> =
> 2|?|?
> 0|?|?
> 0|?|?
> 0|?|?
> 1|?|?
> ```
>
> What I don't want is something like below
>
> ```undesireable example
> -- Partition 1
> A|B|C
> =
> 0|?|?
> 0|?|?
> 1|?|?
> 0|?|?
> 1|?|?
> 2|?|?
> 1|?|?
> -- Partition 2
> A|B|C
> =
> 0|?|?
> 0|?|?
> 0|?|?
> 1|?|?
> 2|?|?
> ```
> Where the A value varies.
>
> Patrik Iselind
>


Re: Using UDF based on Numpy functions in Spark SQL

2020-12-23 Thread Peyman Mohajerian
https://stackoverflow.com/questions/43484269/how-to-register-udf-to-use-in-sql-and-dataframe

On Wed, Dec 23, 2020 at 12:52 PM Mich Talebzadeh 
wrote:

> Hi,
>
>
> This is a shot in the dark so to speak.
>
>
> I would like to use the standard deviation std offered by numpy in
> PySpark. I am using SQL for now
>
>
> The code as below
>
>
>   sqltext = f"""
>
>   SELECT
>
>   rs.Customer_ID
>
> , rs.Number_of_orders
>
> , rs.Total_customer_amount
>
> , rs.Average_order
>
> , rs.Standard_deviation
>
>   FROM
>
>   (
>
> SELECT cust_id AS Customer_ID,
>
> COUNT(amount_sold) AS Number_of_orders,
>
> SUM(amount_sold) AS Total_customer_amount,
>
> AVG(amount_sold) AS Average_order,
>
>   *  STDDEV(amount_sold) AS Standard_deviation*
>
> FROM {DB}.{table}
>
> GROUP BY cust_id
>
> HAVING SUM(amount_sold) > 94000
>
> AND AVG(amount_sold) < STDDEV(amount_sold)
>
>   ) rs
>
>   ORDER BY
>
>   3 DESC
>
>   """
>
>   spark.sql(sqltext)
>
> Now if I wanted to use UDF based on numpy STD function, I can do
>
> import numpy as np
> from pyspark.sql.functions import UserDefinedFunction
> from pyspark.sql.types import DoubleType
> udf = UserDefinedFunction(np.std, DoubleType())
>
> How can I use that udf with spark SQL? I gather this is only possible
> through functional programming?
>
> Thanks,
>
> Mich
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: [Spark SQL]: Stability of large many-to-many joins

2020-03-20 Thread Peyman Mohajerian
Two options, either add salting to your join or filter records that are
frequent, join them separately and the union back, it's the skew join issue.

On Fri, Mar 20, 2020 at 4:12 AM nathan grand 
wrote:

> Hi,
>
> I have two very large datasets, which both have many repeated keys, which I
> wish to join.
>
> A simplified example:
>
> dsA
>
> A_1 |A_2
> 1 |A
> 2 |A
> 3 |A
> 4 |A
> 5 |A
> 1 |B
> 2 |B
> 3 |B
> 1 |C
>
> dsB
>
> B_1 |B_2
> A |B
> A |C
> A |D
> A |E
> A |F
> A |G
> B |A
> B |E
> B |G
> B |H
> C |A
> C |B
>
>
> The join I want to do is:
>
> dsA.join(dsB, dsA("A_2") === dsB($"B_1"), "INNER")
>
> However, this ends putting a lot of pressure on tasks containing frequently
> occurring keys - it's either very, very slow to complete or I encounter
> memory issues.
>
> I've played with grouping both sides by the join key prior to joining
> (which would make the join one-to-one) but memory seems to become an issue
> again as the groups are very large.
>
> Does anyone have any good suggestions as to how to make large many-to-many
> joins reliably complete in Spark??
>
> Reliability for me is much more important than speed - this is for a tool
> so I can't over-tune to specific data sizes/shapes.
>
> Thanks,
>
> Nathan
>


Re: Time-Series Forecasting

2018-09-29 Thread Peyman Mohajerian
Here's a blog on Flint:
https://databricks.com/blog/2018/09/11/introducing-flint-a-time-series-library-for-apache-spark.html
I don't have an opinion about it, just that Flint was mentioned earlier.

On Thu, Sep 20, 2018 at 2:12 AM, Gourav Sengupta 
wrote:

> Hi,
>
> If you are following the time series forecasting with the mathematical
> rigour and tractability then I think that using R is the best option. I do
> think that people tend to claim quite a lot these days that SPARK ML and
> other Python libraries are better, but just pick up a classical text book
> on time series forecasting and start asking fundamental mathematical
> questions and compare for yourself.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Sep 19, 2018 at 5:02 PM Mina Aslani  wrote:
>
>> Hi,
>> I have a question for you. Do we have any Time-Series Forecasting library
>> in Spark?
>>
>> Best regards,
>> Mina
>>
>


Re: pyspark vector

2017-04-24 Thread Peyman Mohajerian
setVocabSize


On Mon, Apr 24, 2017 at 5:36 PM, Zeming Yu  wrote:

> Hi all,
>
> Beginner question:
>
> what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])?
>
> https://spark.apache.org/docs/2.1.0/ml-features.html
>
>  id | texts   | vector
> |-|---
>  0  | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0])
>  1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])
>
>


Re: Ingesting data in parallel across workers in Data Frame

2017-01-20 Thread Peyman Mohajerian
The next section in the same document has a solution.

On Fri, Jan 20, 2017 at 9:03 PM, Abhishek Gupta 
wrote:

> I am trying to load data from the database into DataFrame using JDBC
> driver.I want to get data into partitions the following document has the
> nice explanation how to achieve so.
> https://docs.databricks.com/spark/latest/data-sources/sql-databases.html
>
> The
> problem I am facing that I don't have a numeric column which can be used
> for achieving the partition.
>
> Any help would be appreciated.
>
>
> Thank You
>
> --Abhishek
>
>
>
>


Re: Spark SQL DataFrame to Kafka Topic

2017-01-13 Thread Peyman Mohajerian
Yes, it is called Structured Streaming:
https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
wrote:

> Hi Team ,
>
>  Sorry if this question already asked in this forum..
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
> Here is my Code which Reads Parquet File :
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
> Cheers,
>
> Senthil
>


Re: [ML] Converting ml.DenseVector to mllib.Vector

2016-12-31 Thread Peyman Mohajerian
This may also help:
http://spark.apache.org/docs/latest/ml-migration-guides.html

On Sat, Dec 31, 2016 at 6:51 AM, Marco Mistroni  wrote:

> Hi.
> you have a DataFrame.. there should be either a way to
> - convert a DF to a Vector without doing a cast
> - use a ML library which relies to DataFrames only
>
> I can see that your code is still importing libraries from two different
> 'machine learning ' packages
>
> import org.apache.spark.ml.feature.{MinMaxScaler, Normalizer,
> StandardScaler, VectorAssembler}
> import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors}
>
> You should be able to find exactly same data  structures that you had in
> mllib  under the ml package.i'd advise to stick to ml libaries only,
> that will avoid confusion
>
> i concur with you, this line looks dodgy to me
>
> val rddVec = dfScaled
> .select("scaled_features")
> .rdd
> .map(_(0)
> .asInstanceOf[org.apache.spark.mllib.linalg.Vector])
>
> converting a DF to a Vector is not as simple as doing a cast (like you
> would do in Java)
>
> I did a random search and found this, mayb it'll help
>
> https://community.hortonworks.com/questions/33375/how-to-
> convert-a-dataframe-to-a-vectordense-in-sca.html
>
>
>
>
> hth
>  marco
>
>
>
> On Sat, Dec 31, 2016 at 4:24 AM, Jason Wolosonovich 
> wrote:
>
>> Hello All,
>>
>> I'm working through the Data Science with Scala course on Big Data
>> University and it is not updated to work with Spark 2.0, so I'm adapting
>> the code as I work through it, however I've finally run into something that
>> is over my head. I'm new to Scala as well.
>>
>> When I run this code (https://gist.github.com/jmwol
>> oso/a715cc4d7f1e7cc7951fab4edf6218b1) I get the following error:
>>
>> `java.lang.ClassCastException: org.apache.spark.ml.linalg.DenseVector
>> cannot be cast to org.apache.spark.mllib.linalg.Vector`
>>
>> I believe this is occurring at line 107 of the gist above. The code
>> starting at this line (and continuing to the end of the gist) is the
>> current code in the course.
>>
>> If I try to map to any other class type, then I have problems with the
>> `Statistics.corr(rddVec)`.
>>
>> How can I convert `rddVec` from an `ml.linalg.DenseVector` into an
>> `mllib.linalg.Vector` for use with `Statistics`?
>>
>> Thanks!
>>
>> -Jason
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: how to find NaN values of each row of spark dataframe to decide whether the rows is dropeed or not

2016-09-26 Thread Peyman Mohajerian
Also take a look at this API:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

On Mon, Sep 26, 2016 at 1:09 AM, Bedrytski Aliaksandr 
wrote:

> Hi Muhammet,
>
> python also supports sql queries http://spark.apache.org/docs/latest/sql-
> programming-guide.html#running-sql-queries-programmatically
>
> Regards,
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Mon, Sep 26, 2016, at 10:01, muhammet pakyürek wrote:
>
>
>
>
> but my requst is related to python because i have designed preprocess
>  for data which looks for rows including NaN values. if the number of Nan
> is high above the threshodl. it s deleted otherwise fill it with a
> predictive value. therefore i need python version for this process
>
>
> --
>
> *From:* Bedrytski Aliaksandr 
> *Sent:* Monday, September 26, 2016 7:53 AM
> *To:* muhammet pakyürek
> *Cc:* user@spark.apache.org
> *Subject:* Re: how to find NaN values of each row of spark dataframe to
> decide whether the rows is dropeed or not
>
> Hi Muhammet,
>
> have you tried to use sql queries?
>
> spark.sql("""
> SELECT
> field1,
> field2,
> field3
>FROM table1
>WHERE
> field1 != 'Nan',
> field2 != 'Nan',
> field3 != 'Nan'
> """)
>
>
> This query filters rows containing Nan for a table with 3 columns.
>
> Regards,
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Mon, Sep 26, 2016, at 09:30, muhammet pakyürek wrote:
>
>
> is there any way to do this directly.  if its not, is there any todo this
> indirectly using another datastrcutures of spark
>
>
>
>


Re: how to decide which part of process use spark dataframe and pandas dataframe?

2016-09-26 Thread Peyman Mohajerian
A simple way to do that is to collect data in the driver when you need to
use Python panda.

On Monday, September 26, 2016, muhammet pakyürek  wrote:

>
>
> is there a clear guide to decide the above?
>


Re: Spark Streaming-- for each new file in HDFS

2016-09-15 Thread Peyman Mohajerian
You can listen to files in a specific directory using:
Take a look at:
http://spark.apache.org/docs/latest/streaming-programming-guide.html

streamingContext.fileStream


On Thu, Sep 15, 2016 at 10:31 AM, Jörn Franke  wrote:

> Hi,
> I recommend that the third party application puts an empty file with the
> same filename as the original file, but the extension ".uploaded". This is
> an indicator that the file has been fully (!) written to the fs. Otherwise
> you risk only reading parts of the file.
> Then, you can have a file system listener for this .upload file.
>
> Spark streaming or Kafka are not needed/suitable, if the server is a file
> server. You can use oozie (maybe with a simple custom action) to poll for
> .uploaded files and transmit them.
>
> On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <
> sivaram.kappagan...@adp.com> wrote:
>
> Hello,
>
>
>
> I am a newbie to spark and I have  below requirement.
>
>
>
> Problem statement : A third party application is dumping files
> continuously in a server. Typically the count of files is 100 files  per
> hour and each file is of size less than 50MB. My application has to
>  process those files.
>
>
>
> Here
>
> 1) is it possible  for spark-stream to trigger a job after a file is
> placed instead of triggering a job at fixed batch interval?
>
> 2) If it is not possible with Spark-streaming, can we control this with
> Kafka/Flume
>
>
>
> Thanks,
>
> Sivaram
>
>
> --
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>
>


Re: Scala Vs Python

2016-09-01 Thread Peyman Mohajerian
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

On Thu, Sep 1, 2016 at 3:01 PM, Mich Talebzadeh 
wrote:

> Hi Jacob.
>
> My understanding of Dataset is that it is basically an RDD with some
> optimization gone into it. RDD is meant to deal with unstructured data?
>
> Now DataFrame is the tabular format of RDD designed for tabular work, csv,
> SQL stuff etc.
>
> When you mention DataFrame is just an alias for Dataset[Row] does that
> mean  that it converts an RDD to DataSet thus producing a tabular format?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 September 2016 at 22:49, Jakob Odersky  wrote:
>
>> > However, what really worries me is not having Dataset APIs at all in
>> Python. I think thats a deal breaker.
>>
>> What is the functionality you are missing? In Spark 2.0 a DataFrame is
>> just an alias for Dataset[Row] ("type DataFrame = Dataset[Row]" in
>> core/.../o/a/s/sql/package.scala).
>> Since python is dynamically typed, you wouldn't really gain anything by
>> using Datasets anyway.
>>
>> On Thu, Sep 1, 2016 at 2:20 PM, ayan guha  wrote:
>>
>>> Thanks All for your replies.
>>>
>>> Feature Parity:
>>>
>>> MLLib, RDD and dataframes features are totally comparable. Streaming is
>>> now at par in functionality too, I believe. However, what really worries me
>>> is not having Dataset APIs at all in Python. I think thats a deal breaker.
>>>
>>> Performance:
>>> I do  get this bit when RDDs are involved, but not when Data frame is
>>> the only construct I am operating on.  Dataframe supposed to be
>>> language-agnostic in terms of performance.  So why people think python is
>>> slower? is it because of using UDF? Any other reason?
>>>
>>> *Is there any kind of benchmarking/stats around Python UDF vs Scala UDF
>>> comparison? like the one out there  b/w RDDs.*
>>>
>>> @Kant:  I am not comparing ANY applications. I am comparing SPARK
>>> applications only. I would be glad to hear your opinion on why pyspark
>>> applications will not work, if you have any benchmarks please share if
>>> possible.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Sep 2, 2016 at 12:57 AM, kant kodali  wrote:
>>>
 c'mon man this is no Brainer..Dynamic Typed Languages for Large Code
 Bases or Large Scale Distributed Systems makes absolutely no sense. I can
 write a 10 page essay on why that wouldn't work so great. you might be
 wondering why would spark have it then? well probably because its ease of
 use for ML (that would be my best guess).



 On Wed, Aug 31, 2016 11:45 PM, AssafMendelson assaf.mendel...@rsa.com
 wrote:

> I believe this would greatly depend on your use case and your
> familiarity with the languages.
>
>
>
> In general, scala would have a much better performance than python and
> not all interfaces are available in python.
>
> That said, if you are planning to use dataframes without any UDF then
> the performance hit is practically nonexistent.
>
> Even if you need UDF, it is possible to write those in scala and wrap
> them for python and still get away without the performance hit.
>
> Python does not have interfaces for UDAFs.
>
>
>
> I believe that if you have large structured data and do not generally
> need UDF/UDAF you can certainly work in python without losing too much.
>
>
>
>
>
> *From:* ayan guha [mailto:[hidden email]
> ]
> *Sent:* Thursday, September 01, 2016 5:03 AM
> *To:* user
> *Subject:* Scala Vs Python
>
>
>
> Hi Users
>
>
>
> Thought to ask (again and again) the question: While I am building any
> production application, should I use Scala or Python?
>
>
>
> I have read many if not most articles but all seems pre-Spark 2.
> Anything changed with Spark 2? Either pro-scala way or pro-python way?
>
>
>
> I am thinking performance, feature parity and future direction, not so
> much in terms of skillset or ease of use.
>
>
>
> Or, if you think it is a moot point, please say so as well.
>
>
>
> Any real life example, production experience, anecdotes, personal
> taste, profanity all 

Re: AnalysisException exception while parsing XML

2016-08-31 Thread Peyman Mohajerian
here is an example:
df1 = df0.select(explode("manager.subordinates.subordinate_clerk
<http://manager.subordinates.subordinate_clerk.duties.duty.name/>.duties).alias("duties-flat"),
col("duties-flat.duty.name"").alias("duty-name"))

this is in pyspark, i may have some part of this wrong, didn't test it, but
something similar.

On Wed, Aug 31, 2016 at 5:54 PM, <srikanth.je...@gmail.com> wrote:

> How do we explode nested arrays?
>
>
>
> Thanks,
> Sreekanth Jella
>
>
>
> *From: *Peyman Mohajerian <mohaj...@gmail.com>
> *Sent: *Wednesday, August 31, 2016 7:41 PM
> *To: *srikanth.je...@gmail.com
> *Cc: *user@spark.apache.org
> *Subject: *Re: AnalysisException exception while parsing XML
>
>
>
> Once you get to the 'Array' type, you got to use explode, you cannot to
> the same traversing.
>
>
>
> On Wed, Aug 31, 2016 at 2:19 PM, <srikanth.je...@gmail.com> wrote:
>
> Hello Experts,
>
>
>
> I am using Spark XML package to parse the XML. Below exception is being
> thrown when trying to *parse a tag which exist in arrays of array depth*.
> i.e. in this case subordinate_clerk. .duty.name
>
>
>
> With below sample XML, issue is reproducible:
>
>
>
> 
>
>   
>
>
>
> 1
>
> mgr1
>
> 2005-07-31
>
> 
>
>   
>
> 2
>
> clerk2
>
> 2005-07-31
>
>   
>
>   
>
> 3
>
> clerk3
>
> 2005-07-31
>
>   
>
> 
>
>
>
>   
>
>   
>
>
>
>11
>
>mgr11
>
> 
>
>   
>
> 12
>
> clerk12
>
> 
>
>  
>
>first duty
>
>  
>
>  
>
>second duty
>
>  
>
>
>
>   
>
> 
>
>
>
>   
>
> 
>
>
>
>
>
> scala> df.select( 
> "manager.subordinates.subordinate_clerk.duties.duty.name").show
>
>
>
> Exception is:
>
>  org.apache.spark.sql.AnalysisException: cannot resolve 
> 'manager.subordinates.subordinate_clerk.duties.duty[name]' due to data type 
> mismatch: argument 2 requires integral type, however, 'name' is of string 
> type.;
>
>at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>
>at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>
>at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>
>at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
>
>at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
>at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
>at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
>at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>
>at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>
>at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
>at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>
>at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
>at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>
>at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode

Re: AnalysisException exception while parsing XML

2016-08-31 Thread Peyman Mohajerian
Once you get to the 'Array' type, you got to use explode, you cannot to the
same traversing.

On Wed, Aug 31, 2016 at 2:19 PM,  wrote:

> Hello Experts,
>
>
>
> I am using Spark XML package to parse the XML. Below exception is being
> thrown when trying to *parse a tag which exist in arrays of array depth*.
> i.e. in this case subordinate_clerk. .duty.name
>
>
>
> With below sample XML, issue is reproducible:
>
>
>
> 
>
>   
>
>
>
> 1
>
> mgr1
>
> 2005-07-31
>
> 
>
>   
>
> 2
>
> clerk2
>
> 2005-07-31
>
>   
>
>   
>
> 3
>
> clerk3
>
> 2005-07-31
>
>   
>
> 
>
>
>
>   
>
>   
>
>
>
>11
>
>mgr11
>
> 
>
>   
>
> 12
>
> clerk12
>
> 
>
>  
>
>first duty
>
>  
>
>  
>
>second duty
>
>  
>
>
>
>   
>
> 
>
>
>
>   
>
> 
>
>
>
>
>
> scala> df.select( 
> "manager.subordinates.subordinate_clerk.duties.duty.name").show
>
>
>
> Exception is:
>
>  org.apache.spark.sql.AnalysisException: cannot resolve 
> 'manager.subordinates.subordinate_clerk.duties.duty[name]' due to data type 
> mismatch: argument 2 requires integral type, however, 'name' is of string 
> type.;
>
>at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>
>at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>
>at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>
>at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
>
>at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
>at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
>at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
>at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>
>at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>
>at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
>at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>
>at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
>at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>
>at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
>
>at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)
>
> ... more
>
>
>
>
>
>
>
>
>
> scala> df.printSchema
>
> root
>
>  |-- manager: struct (nullable = true)
>
>  ||-- dateOfJoin: string (nullable = true)
>
>  ||-- id: long (nullable = true)
>
>  ||-- name: string (nullable = true)
>
>  ||-- subordinates: struct (nullable = true)
>
>  |||-- subordinate_clerk: array (nullable = true)
>
>  ||||-- element: struct (containsNull = true)
>
>  |||||-- cid: long (nullable = true)
>
>  |||||-- cname: string (nullable = true)
>
>  |||||-- dateOfJoin: string (nullable = true)
>
>  |||||-- duties: struct (nullable = true)
>
>  ||||||-- duty: array (nullable = true)
>
>  |||||||-- element: struct (containsNull = true)
>
>  ||||||||-- name: string (nullable = true)
>
>
>
>
>
>
>
> Versions info:
>
> Spark - 1.6.0
>
> Scala - 2.10.5
>
> Spark XML - com.databricks:spark-xml_2.10:0.3.3
>
>
>
> Please let me know if there is a solution or workaround for this?
>
>
>
> Thanks,
>
> Sreekanth
>
>
>


Re: update specifc rows to DB using sqlContext

2016-08-11 Thread Peyman Mohajerian
Alternatively, you should be able to write to a new table and use trigger
or some other mechanism to update the particular row. I don't have any
experience with this myself but just looking at this documentation:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#03%20Data%20Sources/5%20Databases%20%26%20Other%20Data%20Sources/2%20JDBC%20for%20SQL%20Databases.html



On Thu, Aug 11, 2016 at 4:14 AM, Mich Talebzadeh 
wrote:

> in that case one alternative would be to save the new table on hdfs and
> then using some simple ETL load it  into a staging table in MySQL and
> update the original table from staging table
>
> The whole thing can be done in a shell script.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 11 August 2016 at 11:52, sujeet jog  wrote:
>
>> I read the table via spark SQL , and perform some  ML activity on the
>> data , and the resultant will be to update some specific columns with the
>> ML improvised result,
>> hence i do not have a option to do the whole operation in MySQL,
>>
>>
>> Thanks,
>> Sujeet
>>
>> On Thu, Aug 11, 2016 at 3:29 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Ok it is clearer now.
>>>
>>> You are using Spark as the query tool on an RDBMS table? Read table via
>>> JDBC, write back updating certain records.
>>>
>>> I have not done this myself but I suspect the issue would be if Spark
>>> write will commit the transaction and maintains ACID compliance. (locking
>>> the rows etc).
>>>
>>> I know it cannot do this to a Hive transactional table.
>>>
>>> Any reason why you are not doing the whole operation in MySQL itself?
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 11 August 2016 at 10:46, sujeet jog  wrote:
>>>
 1 ) using mysql DB
 2 ) will be inserting/update/overwrite to the same table
 3 ) i want to update a specific column in a record, the data is read
 via Spark SQL,

 on the below table which is read via sparkSQL, i would like to update
 the NumOfSamples column .

 consider DF as the dataFrame which holds the records,  registered as
 temporary table MS .

 spark.sqlContext.write.format("jdbc").option("url", url
 ).option("dbtable", "update ms  set NumOfSamples = 20 where 'TimeSeriesID =
 '1000'" As MS ).save

 I believe updating a record via sparkSQL is not supported,  the only
 workaround is to open up a jdbc connection without using spark API's and do
 a direct update ?..

 Sample Ex : -

 mysql> show columns from ms;
 +--+-+--+-+-+---+
 | Field| Type| Null | Key | Default | Extra |
 +--+-+--+-+-+---+
 | TimeSeriesID | varchar(20) | YES  | | NULL|   |
 | NumOfSamples | int(11) | YES  | | NULL|   |
 +--+-+--+-+-+---+


 Thanks,
 Sujeet



 On Tue, Aug 9, 2016 at 6:31 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi,
>
>
>1. what is the underlying DB, say Hive etc
>2. Is table transactional or you are going to do insert/overwrite
>to the same table
>3. can you do all this in the database itself assuming it is an
>RDBMS
>4. Can you provide the sql or pseudo code for such an update
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> 

Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Peyman Mohajerian
You can try 'feature Importances' or 'feature selection' depending on what
else you want to do with the remaining features that's a possibility. Let's
say you are trying to do classification then some of the Spark Libraries
have a model parameter called 'featureImportances' that tell you which
feature(s) are more dominant in you classification, you can then run your
model again with the smaller set of features.
The two approaches are quite different, what I'm suggesting involves
training (supervised learning) in the context of a target function, with
SVD you are doing unsupervised learning.

On Mon, Aug 8, 2016 at 7:23 PM, Rohit Chaddha 
wrote:

> I would rather have less features to make better inferences on the data
> based on the smaller number of factors,
> Any suggestions Sean ?
>
> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:
>
>> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
>> really want to select features or just obtain a lower-dimensional
>> representation of them, with less redundancy?
>>
>> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane  wrote:
>> > There must be an algorithmic way to figure out which of these factors
>> > contribute the least and remove them in the analysis.
>> > I am hoping same one can throw some insight on this.
>> >
>> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
>> wrote:
>> >>
>> >> Not an expert here, but the first step would be devote some time and
>> >> identify which of these 112 factors are actually causative. Some domain
>> >> knowledge of the data may be required. Then, you can start of with PCA.
>> >>
>> >> HTH,
>> >>
>> >> Regards,
>> >>
>> >> Sivakumaran S
>> >>
>> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>> >>
>> >> Great question Rohit.  I am in my early days of ML as well and it
>> would be
>> >> great if we get some idea on this from other experts on this group.
>> >>
>> >> I know we can reduce dimensions by using PCA, but i think that does not
>> >> allow us to understand which factors from the original are we using in
>> the
>> >> end.
>> >>
>> >> - Tony L.
>> >>
>> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha <
>> rohitchaddha1...@gmail.com>
>> >> wrote:
>> >>>
>> >>>
>> >>> I have a data-set where each data-point has 112 factors.
>> >>>
>> >>> I want to remove the factors which are not relevant, and say reduce
>> to 20
>> >>> factors out of these 112 and then do clustering of data-points using
>> these
>> >>> 20 factors.
>> >>>
>> >>> How do I do these and how do I figure out which of the 20 factors are
>> >>> useful for analysis.
>> >>>
>> >>> I see SVD and PCA implementations, but I am not sure if these give
>> which
>> >>> elements are removed and which are remaining.
>> >>>
>> >>> Can someone please help me understand what to do here
>> >>>
>> >>> thanks,
>> >>> -Rohit
>> >>>
>> >>
>> >>
>> >
>>
>
>


Re: ML PipelineModel to be scored locally

2016-07-20 Thread Peyman Mohajerian
One option is to save the model in parquet or json format and then build
your own prediction code. Some also use:

https://github.com/jpmml/jpmml-sparkml

It depends on the model, e.g. ml v mllib and other factors whether this
works on or not. Couple of weeks ago there was a long discussion on this
topic.

On Wed, Jul 20, 2016 at 7:08 AM, Simone Miraglia 
wrote:

> Hi all,
>
> I am working on the following use case involving ML Pipelines.
>
> 1. I created a Pipeline composed from a set of stages
> 2. I called "fit" method on my training set
> 3. I validated my model by calling "transform" on my test set
> 4. I stored my fitted Pipeline to a shared folder
>
> Then I have a very low latency interactive application (say a kinda of web
> service), that should work as follows:
> 1. The app receives a request
> 2. A scoring needs to be made, according to my fitted PipelineModel
> 3. The app sends the score to the caller, in a synchronous fashion
>
> Is there a way to call the .transform method of the PipelineModel over a
> single Row?
>
> I will definitely not want to parallelize a single record to a DataFrame,
> nor relying on Spark Streaming due to latency requirements.
> I would like to use something similar to mllib .predict(Vector) method
> which does not rely on Spark Context performing all the computation locally.
>
> Thanks in advance
> Best
>


Re: Is that possible to feed web request via spark application directly?

2016-06-15 Thread Peyman Mohajerian
There are a variety of REST API services you can use, but you must consider
carefully whether it makes sense to start a Spark job based on individual
requests, unless you mean based on some triggering event you want to start
a Spark job, in which case it makes sense to use the RESTful service.
Whether your Spark cluster is multi-tenant depends on the scheduler you
use, cluster size and other factors. But you seem to be mixing terminology,
a given application can certainly generate many tasks and you could deploy
many applications on a single Spark cluster, whether you do them all
concurrently or not is where the issue of multi-tenancy comes into picture.

On Wed, Jun 15, 2016 at 8:19 PM, Yu Wei  wrote:

> Hi,
>
> I'm learning spark recently. I have one question about spark.
>
>
> Is it possible to feed web requests via spark application directly? Is
> there any library to be used?
>
> Or do I need to write the results from spark to HDFS/HBase?
>
>
> Is one spark application only to be designed to implement one single task?
> Or could multiple tasks be combined into one application?
>
>
>
> Thanks,
>
> Jared
>
>
>


Re: Apache Flink

2016-04-17 Thread Peyman Mohajerian
Microbatching is certainly not a waste of time, you are making way too
strong of an statement. In fact in certain cases one tuple at the time
makes no sense, it all depends on the use cases. In fact if you understand
the history of the project Storm you would know that microbatching was
added later in Storm, Trident, and it is specifically for
microbatching/windowing.
In certain cases you are doing aggregation/windowing and throughput is the
dominant design consideration and you don't care what each individual
event/tuple does, e.g. of you push different event types to separate kafka
topics and all you care is to do a count, what is the need for single event
processing.

On Sun, Apr 17, 2016 at 12:43 PM, Corey Nolet  wrote:

> i have not been intrigued at all by the microbatching concept in Spark. I
> am used to CEP in real streams processing environments like Infosphere
> Streams & Storm where the granularity of processing is at the level of each
> individual tuple and processing units (workers) can react immediately to
> events being received and processed. The closest Spark streaming comes to
> this concept is the notion of "state" that that can be updated via the
> "updateStateBykey()" functions which are only able to be run in a
> microbatch. Looking at the expected design changes to Spark Streaming in
> Spark 2.0.0, it also does not look like tuple-at-a-time processing is on
> the radar for Spark, though I have seen articles stating that more effort
> is going to go into the Spark SQL layer in Spark streaming which may make
> it more reminiscent of Esper.
>
> For these reasons, I have not even tried to implement CEP in Spark. I feel
> it's a waste of time without immediate tuple-at-a-time processing. Without
> this, they avoid the whole problem of "back pressure" (though keep in mind,
> it is still very possible to overload the Spark streaming layer with stages
> that will continue to pile up and never get worked off) but they lose the
> granular control that you get in CEP environments by allowing the rules &
> processors to react with the receipt of each tuple, right away.
>
> Awhile back, I did attempt to implement an InfoSphere Streams-like API [1]
> on top of Apache Storm as an example of what such a design may look like.
> It looks like Storm is going to be replaced in the not so distant future by
> Twitter's new design called Heron. IIRC, Heron does not have an open source
> implementation as of yet.
>
> [1] https://github.com/calrissian/flowmix
>
> On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Corey,
>>
>> Can you please point me to docs on using Spark for CEP? Do we have a set
>> of CEP libraries somewhere. I am keen on getting hold of adaptor libraries
>> for Spark something like below
>>
>>
>>
>> ​
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 April 2016 at 16:07, Corey Nolet  wrote:
>>
>>> One thing I've noticed about Flink in my following of the project has
>>> been that it has established, in a few cases, some novel ideas and
>>> improvements over Spark. The problem with it, however, is that both the
>>> development team and the community around it are very small and many of
>>> those novel improvements have been rolled directly into Spark in subsequent
>>> versions. I was considering changing over my architecture to Flink at one
>>> point to get better, more real-time CEP streaming support, but in the end I
>>> decided to stick with Spark and just watch Flink continue to pressure it
>>> into improvement.
>>>
>>> On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers 
>>> wrote:
>>>
 i never found much info that flink was actually designed to be fault
 tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that
 doesn't bode well for large scale data processing. spark was designed with
 fault tolerance in mind from the beginning.

 On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi,
>
> I read the benchmark published by Yahoo. Obviously they already use
> Storm and inevitably very familiar with that tool. To start with although
> these benchmarks were somehow interesting IMO, it lend itself to an
> assurance that the tool chosen for their platform is still the best 
> choice.
> So inevitably the benchmarks and the tests were done to support
> primary their approach.
>
> In general anything which is not done through TCP Council or similar
> body is questionable..
> Their argument is that because Spark handles data streaming in micro
> batches then inevitably it introduces this in-built latency as per 

Re: Spark replacing Hadoop

2016-04-14 Thread Peyman Mohajerian
Cloud adds another dimension:
The fact that in cloud compute and storage is decoupled, s3-emr or
blob-hdisight, means in cloud Hadoop ends up being more of a compute engine
and a lot of the governance, security features are irrelevant or less
important because data at rest is out of Hadoop.
Currently the biggest reason to run Spark in Hadoop is Yarn (in cloud),
 but if you decide to use Mesos/Standalone then again you may not need
Hadoop. Databrick adds another dimension to this in cloud which I won't
comment on.

But on-premise I think you can argue that HDFS is here to stay in many
forms, e.g. Isilon, object stores and other storage types not just local
disk. HDFS API actually works over Azure's Data Lake Store completely
independent of Hadoop!

On Thu, Apr 14, 2016 at 1:29 PM, Cody Koeninger  wrote:

> I've been using spark for years and have (thankfully) been able to
> avoid needing HDFS, aside from one contract where it was already in
> use.
>
> At this point, many of the people I know would consider Kafka to be
> more important than HDFS.
>
> On Thu, Apr 14, 2016 at 3:11 PM, Jörn Franke  wrote:
> > I do not think so. Hadoop provides an ecosystem in which you can deploy
> > different engines, such as MR, HBase, TEZ, Spark, Flink, titandb, hive,
> > solr... I observe also that commercial analytical tools use one or more
> of
> > these engines to execute their code in a distributed fashion. You  need
> this
> > flexibility to have an ecosystem suitable for your needs -especially In
> the
> > area of security. HDFS is one key element for the storage and locality.
> > Spark itself cannot provide such a complete ecosystem but is part of
> > ecosystems.
> >
> > On 14 Apr 2016, at 21:13, Ashok Kumar 
> wrote:
> >
> > Hi,
> >
> > I hear that some saying that Hadoop is getting old and out of date and
> will
> > be replaced by Spark!
> >
> > Does this make sense and if so how accurate is it?
> >
> > Best
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Sqoop on Spark

2016-04-06 Thread Peyman Mohajerian
It is using JDBC driver, i know that's the case for Teradata:
http://developer.teradata.com/connectivity/articles/teradata-connector-for-hadoop-now-available

Teradata Connector (which is used by Cloudera and Hortonworks) for doing
Sqoop is parallelized and works with ORC and probably other formats as
well. It is using JDBC for each connection between data-nodes and their AMP
(compute) nodes. There is an additional layer that coordinates all of it.
I know Oracle has a similar technology I've used it and had to supply the
JDBC driver.

Teradata Connector is for batch data copy, QueryGrid is for interactive
data movement.

On Wed, Apr 6, 2016 at 4:05 PM, Yong Zhang <java8...@hotmail.com> wrote:

> If they do that, they must provide a customized input format, instead of
> through JDBC.
>
> Yong
>
> --
> Date: Wed, 6 Apr 2016 23:56:54 +0100
> Subject: Re: Sqoop on Spark
> From: mich.talebza...@gmail.com
> To: mohaj...@gmail.com
> CC: jornfra...@gmail.com; msegel_had...@hotmail.com; guha.a...@gmail.com;
> linguin@gmail.com; user@spark.apache.org
>
>
> SAP Sybase IQ does that and I believe SAP Hana as well.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 6 April 2016 at 23:49, Peyman Mohajerian <mohaj...@gmail.com> wrote:
>
> For some MPP relational stores (not operational) it maybe feasible to run
> Spark jobs and also have data locality. I know QueryGrid (Teradata) and
> PolyBase (microsoft) use data locality to move data between their MPP and
> Hadoop.
> I would guess (have no idea) someone like IBM already is doing that for
> Spark, maybe a bit off topic!
>
> On Wed, Apr 6, 2016 at 3:29 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
> Well I am not sure, but using a database as a storage, such as relational
> databases or certain nosql databases (eg MongoDB) for Spark is generally a
> bad idea - no data locality, it cannot handle real big data volumes for
> compute and you may potentially overload an operational database.
> And if your job fails for whatever reason (eg scheduling ) then you have
> to pull everything out again. Sqoop and HDFS seems to me the more elegant
> solution together with spark. These "assumption" on parallelism have to be
> anyway made with any solution.
> Of course you can always redo things, but why - what benefit do you
> expect? A real big data platform has to support anyway many different tools
> otherwise people doing analytics will be limited.
>
> On 06 Apr 2016, at 20:05, Michael Segel <msegel_had...@hotmail.com> wrote:
>
> I don’t think its necessarily a bad idea.
>
> Sqoop is an ugly tool and it requires you to make some assumptions as a
> way to gain parallelism. (Not that most of the assumptions are not valid
> for most of the use cases…)
>
> Depending on what you want to do… your data may not be persisted on HDFS.
> There are use cases where your cluster is used for compute and not storage.
>
> I’d say that spending time re-inventing the wheel can be a good thing.
> It would be a good idea for many to rethink their ingestion process so
> that they can have a nice ‘data lake’ and not a ‘data sewer’. (Stealing
> that term from Dean Wampler. ;-)
>
> Just saying. ;-)
>
> -Mike
>
> On Apr 5, 2016, at 10:44 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
> I do not think you can be more resource efficient. In the end you have to
> store the data anyway on HDFS . You have a lot of development effort for
> doing something like sqoop. Especially with error handling.
> You may create a ticket with the Sqoop guys to support Spark as an
> execution engine and maybe it is less effort to plug it in there.
> Maybe if your cluster is loaded then you may want to add more machines or
> improve the existing programs.
>
> On 06 Apr 2016, at 07:33, ayan guha <guha.a...@gmail.com> wrote:
>
> One of the reason in my mind is to avoid Map-Reduce application completely
> during ingestion, if possible. Also, I can then use Spark stand alone
> cluster to ingest, even if my hadoop cluster is heavily loaded. What you
> guys think?
>
> On Wed, Apr 6, 2016 at 3:13 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
> Why do you want to reimplement something which is already there?
>
> On 06 Apr 2016, at 06:47, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> Thanks for reply. My use case is query ~40 tables from Oracle (using index
> and incremental only) and add data to existing Hive tables. Also

Re: Sqoop on Spark

2016-04-06 Thread Peyman Mohajerian
For some MPP relational stores (not operational) it maybe feasible to run
Spark jobs and also have data locality. I know QueryGrid (Teradata) and
PolyBase (microsoft) use data locality to move data between their MPP and
Hadoop.
I would guess (have no idea) someone like IBM already is doing that for
Spark, maybe a bit off topic!

On Wed, Apr 6, 2016 at 3:29 PM, Jörn Franke  wrote:

> Well I am not sure, but using a database as a storage, such as relational
> databases or certain nosql databases (eg MongoDB) for Spark is generally a
> bad idea - no data locality, it cannot handle real big data volumes for
> compute and you may potentially overload an operational database.
> And if your job fails for whatever reason (eg scheduling ) then you have
> to pull everything out again. Sqoop and HDFS seems to me the more elegant
> solution together with spark. These "assumption" on parallelism have to be
> anyway made with any solution.
> Of course you can always redo things, but why - what benefit do you
> expect? A real big data platform has to support anyway many different tools
> otherwise people doing analytics will be limited.
>
> On 06 Apr 2016, at 20:05, Michael Segel  wrote:
>
> I don’t think its necessarily a bad idea.
>
> Sqoop is an ugly tool and it requires you to make some assumptions as a
> way to gain parallelism. (Not that most of the assumptions are not valid
> for most of the use cases…)
>
> Depending on what you want to do… your data may not be persisted on HDFS.
> There are use cases where your cluster is used for compute and not storage.
>
> I’d say that spending time re-inventing the wheel can be a good thing.
> It would be a good idea for many to rethink their ingestion process so
> that they can have a nice ‘data lake’ and not a ‘data sewer’. (Stealing
> that term from Dean Wampler. ;-)
>
> Just saying. ;-)
>
> -Mike
>
> On Apr 5, 2016, at 10:44 PM, Jörn Franke  wrote:
>
> I do not think you can be more resource efficient. In the end you have to
> store the data anyway on HDFS . You have a lot of development effort for
> doing something like sqoop. Especially with error handling.
> You may create a ticket with the Sqoop guys to support Spark as an
> execution engine and maybe it is less effort to plug it in there.
> Maybe if your cluster is loaded then you may want to add more machines or
> improve the existing programs.
>
> On 06 Apr 2016, at 07:33, ayan guha  wrote:
>
> One of the reason in my mind is to avoid Map-Reduce application completely
> during ingestion, if possible. Also, I can then use Spark stand alone
> cluster to ingest, even if my hadoop cluster is heavily loaded. What you
> guys think?
>
> On Wed, Apr 6, 2016 at 3:13 PM, Jörn Franke  wrote:
>
>> Why do you want to reimplement something which is already there?
>>
>> On 06 Apr 2016, at 06:47, ayan guha  wrote:
>>
>> Hi
>>
>> Thanks for reply. My use case is query ~40 tables from Oracle (using
>> index and incremental only) and add data to existing Hive tables. Also, it
>> would be good to have an option to create Hive table, driven by job
>> specific configuration.
>>
>> What do you think?
>>
>> Best
>> Ayan
>>
>> On Wed, Apr 6, 2016 at 2:30 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> It depends on your use case using sqoop.
>>> What's it like?
>>>
>>> // maropu
>>>
>>> On Wed, Apr 6, 2016 at 1:26 PM, ayan guha  wrote:
>>>
 Hi All

 Asking opinion: is it possible/advisable to use spark to replace what
 sqoop does? Any existing project done in similar lines?

 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


Re: Repeating Records w/ Spark + Avro?

2016-03-11 Thread Peyman Mohajerian
Here is the reason for the behavior:
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each record, directly caching the returned RDD or directly
passing it to an aggregation or shuffle operation will create many
references to the same object. If you plan to directly cache, sort, or
aggregate Hadoop writable objects, you should first copy them using a map
 function.

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html

So it is Hadoop related.

On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller 
wrote:

> I have a bit of a strange situation:
>
> *
> import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
> import org.apache.avro.mapreduce.AvroKeyInputFormat
> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>
> val path = "/path/to/data.avro"
>
> val rdd = sc.newAPIHadoopFile(path,
> classOf[AvroKeyInputFormat[GenericRecord]],
> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
> rdd.take(10).foreach( x => println( x._1.datum() ))
> *
>
> In this situation, I get the right number of records returned, and if I
> look at the contents of rdd I see the individual records as tuple2's...
> however, if I println on each one as shown above, I get the same result
> every time.
>
> Apparently this has to do with something in Spark or Avro keeping a
> reference to the item its iterating over, so I need to clone the object
> before I use it. However, if I try to clone it (from the spark-shell
> console), I get:
>
> *
> rdd.take(10).foreach( x => {
>   val clonedDatum = x._1.datum().clone()
>   println(clonedDatum.datum())
> })
>
> :37: error: method clone in class Object cannot be accessed in
> org.apache.avro.generic.GenericRecord
>  Access to protected method clone not permitted because
>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>  class $iwC where the access take place
> val clonedDatum = x._1.datum().clone()
> *
>
> So, how can I clone the datum?
>
> Seems I'm not the only one who ran into this problem:
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
> can't figure out how to fix it in my case without hacking away like the
> person in the linked PR did.
>
> Suggestions?
>
> --
> Chris Miller
>


Re: Is Spark right for us?

2016-03-06 Thread Peyman Mohajerian
if your relational database has enough computing power, you don't have to
change it. You can just run SQL queries on top of it or even run Spark
queries over it. There is no hard-fast rule about using big data tools.
Usually people or organizations don't jump into big data for one specific
use case, it is a journey that involves multiple use cases, and future
growth and a lot more. If you data is already fitting in relational store
and you can used the existing SQL analytics and BI tool why consider other
options, unless you want to learn something new or data will grow over time
and you want to future proof it.

On Sun, Mar 6, 2016 at 12:59 PM, Krishna Sankar  wrote:

> Good question. It comes to computational complexity, computational scale
> and data volume.
>
>1. If you can store the data in a single server or a small cluster of
>db server (say mysql) then hdfs/Spark might be an overkill
>2. If you can run the computation/process the data on a single machine
>(remember servers with 512 GB memory and quad core CPUs can do a lot of
>stuff) then Spark is an overkill
>3. Even if you can do computations #1 & #2 above, in a pipeline and
>tolerate the elapsed time, Spark might be an overkill
>4. But if you require data/computation parallelism or distributed
>processing of data due to computation complexities or data volume or time
>constraints incl real time analytics, Spark is the right stack.
>5. Taking a quick look at what you have described so far, probably
>Spark is not needed.
>
> Cheers & HTH
> 
>
> On Sun, Mar 6, 2016 at 9:17 AM, Laumegui Deaulobi <
> guillaume.bilod...@gmail.com> wrote:
>
>> Our problem space is survey analytics.  Each survey comprises a set of
>> questions, with each question having a set of possible answers.  Survey
>> fill-out tasks are sent to users, who have until a certain date to
>> complete
>> it.  Based on these survey fill-outs, reports need to be generated.  Each
>> report deals with a subset of the survey fill-outs, and comprises a set of
>> data points (average rating for question 1, min/max for question 2, etc.)
>>
>> We are dealing with rather large data sets - although reading the internet
>> we get the impression that everyone is analyzing petabytes of data...
>>
>> Users: up to 100,000
>> Surveys: up to 100,000
>> Questions per survey: up to 100
>> Possible answers per question: up to 10
>> Survey fill-outs / user: up to 10
>> Reports: up to 100,000
>> Data points per report: up to 100
>>
>> Data is currently stored in a relational database but a migration to a
>> different kind of store is possible.
>>
>> The naive algorithm for report generation can be summed up as this:
>>
>> for each report to be generated {
>>   for each report data point to be calculated {
>> calculate data point
>> add data point to report
>>   }
>>   publish report
>> }
>>
>> In order to deal with the upper limits of these values, we will need to
>> distribute this algorithm to a compute / data cluster as much as possible.
>>
>> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
>> HazelCast and several others, and am still confused as to how each of
>> these
>> can help us and how they fit together.
>>
>> Is Spark the right framework for us?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Aster Functions equivalent in spark : cfilter, npath and sessionize

2015-10-29 Thread Peyman Mohajerian
Some of the Aster functions you are referring to can be done using Window
functions in SparkSQL:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

On Thu, Oct 29, 2015 at 12:16 PM, didier vila 
wrote:

> Good morning all,
>
> I am interesting to know if there is some Aster equivalent functions in
> Spark .
>
>
>- In particular, I would like to sesionize or create some sessions
>based.
>
>
>
>- I would to create some npath or sequence based based on a specific
>pattern.
>
> https://aster-community.teradata.com/docs/DOC-1544
>
>
>- I would like to derive some affinitie
>
> https://aster-community.teradata.com/docs/DOC-1552
>
> I am python user of Spark and I would to demonstrate that Spark is  a very
> good candidate instead of Aster ( Aster will run in Hadoop soon).
>
> do you have any Spark python code  to demonstrate my colleague that Spark
> is the best options ?
>
> thanks for this.
>
> sparkthewolf
>
>
>
>
>


Re: Is there any Spark SQL reference manual?

2015-09-11 Thread Peyman Mohajerian
http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

On Fri, Sep 11, 2015 at 8:15 AM, Richard Hillegas 
wrote:

> The latest Derby SQL Reference manual (version 10.11) can be found here:
> https://db.apache.org/derby/docs/10.11/ref/index.html. It is, indeed,
> very useful to have a comprehensive reference guide. The Derby build
> scripts can also produce a BNF description of the grammar--but that is not
> part of the public documentation for the project. The BNF is trivial to
> generate because it is an artifact of the JavaCC grammar generator which
> Derby uses.
>
> I appreciate the difficulty of maintaining a formal reference guide for a
> rapidly evolving SQL dialect like Spark's.
>
> A machine-generated BNF, however, is easy to imagine. But perhaps not so
> easy to implement. Spark's SQL grammar is implemented in Scala, extending
> the DSL support provided by the Scala language. I am new to programming in
> Scala, so I don't know whether the Scala ecosystem provides any good tools
> for reverse-engineering a BNF from a class which extends
> scala.util.parsing.combinator.syntactical.StandardTokenParsers.
>
> Thanks,
> -Rick
>
> vivekw...@gmail.com wrote on 09/11/2015 05:05:47 AM:
>
> > From: vivek bhaskar 
> > To: Ted Yu 
> > Cc: user 
> > Date: 09/11/2015 05:06 AM
> > Subject: Re: Is there any Spark SQL reference manual?
> > Sent by: vivekw...@gmail.com
>
> >
> > Hi Ted,
> >
> > The link you mention do not have complete list of supported syntax.
> > For example, few supported syntax are listed as "Supported Hive
> > features" but that do not claim to be exhaustive (even if it is so,
> > one has to filter out a lot many lines from Hive QL reference and
> > still will not be sure if its all - due to versions mismatch).
> >
> > Quickly searching online gives me link for another popular open
> > source project which has good sql reference: https://db.apache.org/
> > derby/docs/10.1/ref/crefsqlj23296.html.
> >
> > I had similar expectation when I was looking for all supported DDL
> > and DML syntax along with their extensions. For example,
> > a. Select expression along with supported extensions i.e. where
> > clause, group by, different supported joins etc.
> > b. SQL format for Create, Insert, Alter table etc.
> > c. SQL for Insert, Update, Delete, etc along with their extensions.
> > d. Syntax for view creation, if supported
> > e. Syntax for explain mechanism
> > f. List of supported functions, operators, etc. I can see that 100s
> > of function are added in 1.5 but then you have to make lot of cross
> > check from code to JIRA tickets.
> >
> > So I wanted a piece of documentation that can provide all such
> > information at a single place.
> >
> > Regards,
> > Vivek
> >
> > On Fri, Sep 11, 2015 at 4:29 PM, Ted Yu  wrote:
> > You may have seen this:
> > https://spark.apache.org/docs/latest/sql-programming-guide.html
> >
> > Please suggest what should be added.
> >
> > Cheers
> >
> > On Fri, Sep 11, 2015 at 3:43 AM, vivek bhaskar 
> wrote:
> > Hi all,
> >
> > I am looking for a reference manual for Spark SQL some thing like
> > many database vendors have. I could find one for hive ql https://
> > cwiki.apache.org/confluence/display/Hive/LanguageManual but not
> > anything specific to spark sql.
> >
> > Please suggest. SQL reference specific to latest release will be of
> > great help.
> >
> > Regards,
> > Vivek
>
>


Re: Twitter streaming with apache spark stream only a small amount of tweets

2015-07-29 Thread Peyman Mohajerian
This question was answered with sample code a couple of days ago, please
look back.

On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi,

 I discovered what is the problem here. Twitter public stream is limited to
 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't
 access all the tweets posted with specific hashtag using approach that I
 posted in previous email, so I guess this approach would not work for me.
 The other problem is that filtering has a limit of 400 hashtags (
 https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I
 need more parallel streams.

 This brings me back to my previous question (https://goo.gl/bVDkHx). In
 my application I need to follow more than 400 hashtags, and I need to
 collect each tweet having one of these hashtags. Another complication is
 that users could add new hashtags or remove old hashtags, so I have to
 update stream in the real-time.
 My earlier approach without Apache Spark was to create twitter4j user
 stream with initial filter, and each time new hashtag has to be added, stop
 stream, add new hashtag and run it again. When stream had 400 hashtags, I
 initialize new stream with new credentials. This was really complex, and I
 was hopping that Apache Spark would make it simpler. However, I'm trying
 for a days to find solution, and had no success.

 If I have to use the same approach I used with twitter4j, I have to solve
 2 problems:
 - how to run multiple twitter streams in the same spark context
 - how to add new hashtags to the existing filter

 I hope that somebody will have some more elegant solution and idea, and
 tell me that I missed something obvious.

 Thanks,
 Zoran

 On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi,

 I've implemented Twitter streaming as in the code given at the bottom of
 email. It finds some tweets based on the hashtags I'm following. However,
 it seems that a large amount of tweets is missing. I've tried to post some
 tweets that I'm following in the application, and none of them was received
 in application. I also checked some hashtags (e.g. #android) on Twitter
 using Live and I could see that almost each second something was posted
 with that hashtag, and my application received only 3-4 posts in one minute.

 I didn't have this problem in earlier non-spark version of application
 which used twitter4j to access user stream API. I guess this is some
 trending stream, but I couldn't find anything that explains which Twitter
 API is used in Spark Twitter Streaming and how to create stream that will
 access everything posted on the Twitter.

 I hope somebody could explain what is the problem and how to solve this.

 Thanks,
 Zoran


  def initializeStreaming(){
val config = getTwitterConfigurationBuilder.build()
val auth: Option[twitter4j.auth.Authorization] = Some(new
 twitter4j.auth.OAuthAuthorization(config))
val stream:DStream[Status]  = TwitterUtils.createStream(ssc, auth)
val filtered_statuses = stream.transform(rdd ={
 val filtered = rdd.filter(status ={
 var found = false
 for(tag - hashTagsList){
   if(status.getText.toLowerCase.contains(tag)) {
 found = true
 }
 }
 found
   })
   filtered
 })
 filtered_statuses.foreachRDD(rdd = {
   rdd.collect.foreach(t = {
 println(t)
   })
})
 ssc.start()
   }








Re: Twitter streaming with apache spark stream only a small amount of tweets

2015-07-29 Thread Peyman Mohajerian
'How to restart Twitter spark stream' i
It may not be exactly what you are looking for, but i thought it did touch
on some aspect of your question.

On Wed, Jul 29, 2015 at 10:26 AM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Can you send me the subject of that email? I can't find any email
 suggesting solution to that problem. There is email *Twitter4j streaming
 question*, but it doesn't have any sample code. It just confirms what I
 explained earlier that without filtering Twitter will limit to 1% of
 tweets, and if you use filter API, Twitter limits you to 400 hashtags you
 can follow.

 Thanks,
 Zoran

 On Wed, Jul 29, 2015 at 8:40 AM, Peyman Mohajerian mohaj...@gmail.com
 wrote:

 This question was answered with sample code a couple of days ago, please
 look back.

 On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi,

 I discovered what is the problem here. Twitter public stream is limited
 to 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't
 access all the tweets posted with specific hashtag using approach that I
 posted in previous email, so I guess this approach would not work for me.
 The other problem is that filtering has a limit of 400 hashtags (
 https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I
 need more parallel streams.

 This brings me back to my previous question (https://goo.gl/bVDkHx). In
 my application I need to follow more than 400 hashtags, and I need to
 collect each tweet having one of these hashtags. Another complication is
 that users could add new hashtags or remove old hashtags, so I have to
 update stream in the real-time.
 My earlier approach without Apache Spark was to create twitter4j user
 stream with initial filter, and each time new hashtag has to be added, stop
 stream, add new hashtag and run it again. When stream had 400 hashtags, I
 initialize new stream with new credentials. This was really complex, and I
 was hopping that Apache Spark would make it simpler. However, I'm trying
 for a days to find solution, and had no success.

 If I have to use the same approach I used with twitter4j, I have to
 solve 2 problems:
 - how to run multiple twitter streams in the same spark context
 - how to add new hashtags to the existing filter

 I hope that somebody will have some more elegant solution and idea, and
 tell me that I missed something obvious.

 Thanks,
 Zoran

 On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi,

 I've implemented Twitter streaming as in the code given at the bottom
 of email. It finds some tweets based on the hashtags I'm following.
 However, it seems that a large amount of tweets is missing. I've tried to
 post some tweets that I'm following in the application, and none of them
 was received in application. I also checked some hashtags (e.g. #android)
 on Twitter using Live and I could see that almost each second something was
 posted with that hashtag, and my application received only 3-4 posts in one
 minute.

 I didn't have this problem in earlier non-spark version of application
 which used twitter4j to access user stream API. I guess this is some
 trending stream, but I couldn't find anything that explains which Twitter
 API is used in Spark Twitter Streaming and how to create stream that will
 access everything posted on the Twitter.

 I hope somebody could explain what is the problem and how to solve this.

 Thanks,
 Zoran


  def initializeStreaming(){
val config = getTwitterConfigurationBuilder.build()
val auth: Option[twitter4j.auth.Authorization] = Some(new
 twitter4j.auth.OAuthAuthorization(config))
val stream:DStream[Status]  = TwitterUtils.createStream(ssc, auth)
val filtered_statuses = stream.transform(rdd ={
 val filtered = rdd.filter(status ={
 var found = false
 for(tag - hashTagsList){
   if(status.getText.toLowerCase.contains(tag)) {
 found = true
 }
 }
 found
   })
   filtered
 })
 filtered_statuses.foreachRDD(rdd = {
   rdd.collect.foreach(t = {
 println(t)
   })
})
 ssc.start()
   }









 --

 ***
 Zoran Jeremic, PhD
 Senior System Analyst  Programmer

 Athabasca University
 Tel: +1 604 92 89 944
 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs
 Homepage:  http://zoranjeremic.org

 **