Re: Release Apache Spark 2.4.4

2019-08-13 Thread Terry Kim
Can the following be included?

[SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in
EpochTracker (to support Python UDFs)


Thanks,
Terry

On Tue, Aug 13, 2019 at 10:24 PM Wenchen Fan  wrote:

> +1
>
> On Wed, Aug 14, 2019 at 12:52 PM Holden Karau 
> wrote:
>
>> +1
>> Does anyone have any critical fixes they’d like to see in 2.4.4?
>>
>> On Tue, Aug 13, 2019 at 5:22 PM Sean Owen  wrote:
>>
>>> Seems fine to me if there are enough valuable fixes to justify another
>>> release. If there are any other important fixes imminent, it's fine to
>>> wait for those.
>>>
>>>
>>> On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun 
>>> wrote:
>>> >
>>> > Hi, All.
>>> >
>>> > Spark 2.4.3 was released three months ago (8th May).
>>> > As of today (13th August), there are 112 commits (75 JIRAs) in
>>> `branch-24` since 2.4.3.
>>> >
>>> > It would be great if we can have Spark 2.4.4.
>>> > Shall we start `2.4.4 RC1` next Monday (19th August)?
>>> >
>>> > Last time, there was a request for K8s issue and now I'm waiting for
>>> SPARK-27900.
>>> > Please let me know if there is another issue.
>>> >
>>> > Thanks,
>>> > Dongjoon.
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Wenchen Fan
+1

On Wed, Aug 14, 2019 at 12:52 PM Holden Karau  wrote:

> +1
> Does anyone have any critical fixes they’d like to see in 2.4.4?
>
> On Tue, Aug 13, 2019 at 5:22 PM Sean Owen  wrote:
>
>> Seems fine to me if there are enough valuable fixes to justify another
>> release. If there are any other important fixes imminent, it's fine to
>> wait for those.
>>
>>
>> On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun 
>> wrote:
>> >
>> > Hi, All.
>> >
>> > Spark 2.4.3 was released three months ago (8th May).
>> > As of today (13th August), there are 112 commits (75 JIRAs) in
>> `branch-24` since 2.4.3.
>> >
>> > It would be great if we can have Spark 2.4.4.
>> > Shall we start `2.4.4 RC1` next Monday (19th August)?
>> >
>> > Last time, there was a request for K8s issue and now I'm waiting for
>> SPARK-27900.
>> > Please let me know if there is another issue.
>> >
>> > Thanks,
>> > Dongjoon.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Any advice how to do this usecase in spark sql ?

2019-08-13 Thread Jörn Franke
Have you tried to join both datasets, filter accordingly and then write the 
full dataset to your filesystem?
Alternatively work with a NoSQL database that you update by key (eg it sounds a 
key/value store could be useful for you).

However, it could be also that you need to do more depending on your use case.

> Am 14.08.2019 um 05:08 schrieb Shyam P :
> 
> Hi,
> Any advice how to do this in spark sql ?
> 
> I have a scenario as below
> 
> dataframe1   = loaded from an HDFS Parquet file.
> 
> dataframe2 =   read from a Kafka Stream.
> 
> If column1 of dataframe1 value in columnX value of dataframe2 , then I need 
> then I need to replace column1 value of dataframe1. 
> 
> Else add column1 value of dataframe1 to dataframe2 as a new record.
> 
> 
> 
> In a sense need to implement a look up dataframe which is refresh-able.
> 
> For more information please check
> 
> https://stackoverflow.com/questions/57479581/how-to-do-this-scenario-in-spark-streaming?noredirect=1#comment101437596_57479581
>  
> 
>  Let me know if u need more info  
> 
> Thanks


Re: Continuous processing mode and python udf

2019-08-13 Thread Hyukjin Kwon
that's fixed in
https://github.com/apache/spark/commit/b83b7927b3a85c1a4945e2224ed811b5bb804477

2019년 8월 13일 (화) 오후 12:37, zenglong chen 님이 작성:

> Does Spark 2.4.0 support Python UDFs with Continuous Processing mode?
> I try it and occur error like below:
>  WARN scheduler.TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4,
> 172.22.9.179, executor 1): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347)
> at scala.None$.get(Option.scala:345)
> at
> org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader.next(ContinuousQueuedDataReader.scala:116)
> at
> org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:83)
> at
> org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:81)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
>


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Holden Karau
+1
Does anyone have any critical fixes they’d like to see in 2.4.4?

On Tue, Aug 13, 2019 at 5:22 PM Sean Owen  wrote:

> Seems fine to me if there are enough valuable fixes to justify another
> release. If there are any other important fixes imminent, it's fine to
> wait for those.
>
>
> On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun 
> wrote:
> >
> > Hi, All.
> >
> > Spark 2.4.3 was released three months ago (8th May).
> > As of today (13th August), there are 112 commits (75 JIRAs) in
> `branch-24` since 2.4.3.
> >
> > It would be great if we can have Spark 2.4.4.
> > Shall we start `2.4.4 RC1` next Monday (19th August)?
> >
> > Last time, there was a request for K8s issue and now I'm waiting for
> SPARK-27900.
> > Please let me know if there is another issue.
> >
> > Thanks,
> > Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
> --
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


RE: Release Apache Spark 2.4.4

2019-08-13 Thread Kazuaki Ishizaki
Thanks, Dongjoon!
+1

Kazuaki Ishizaki,



From:   Hyukjin Kwon 
To: Takeshi Yamamuro 
Cc: Dongjoon Hyun , dev 
, User 
Date:   2019/08/14 09:21
Subject:[EXTERNAL] Re: Release Apache Spark 2.4.4



+1

2019년 8월 14일 (수) 오전 9:13, Takeshi Yamamuro 님
이 작성:
Hi,

Thanks for your notification, Dongjoon!
I put some links for the other committers/PMCs to access the info easily:

A commit list in github from the last release: 
https://github.com/apache/spark/compare/5ac2014e6c118fbeb1fe8e5c8064c4a8ee9d182a...branch-2.4
A issue list in jira: 
https://issues.apache.org/jira/projects/SPARK/versions/12345466#release-report-tab-body
The 5 correctness issues resolved in branch-2.4:
https://issues.apache.org/jira/browse/SPARK-27798?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012345466%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC

Anyway, +1

Best,
Takeshi

On Wed, Aug 14, 2019 at 8:25 AM DB Tsai  wrote:
+1

On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun  
wrote:
>
> Hi, All.
>
> Spark 2.4.3 was released three months ago (8th May).
> As of today (13th August), there are 112 commits (75 JIRAs) in 
`branch-24` since 2.4.3.
>
> It would be great if we can have Spark 2.4.4.
> Shall we start `2.4.4 RC1` next Monday (19th August)?
>
> Last time, there was a request for K8s issue and now I'm waiting for 
SPARK-27900.
> Please let me know if there is another issue.
>
> Thanks,
> Dongjoon.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



-- 
---
Takeshi Yamamuro




Any advice how to do this usecase in spark sql ?

2019-08-13 Thread Shyam P
Hi,
Any advice how to do this in spark sql ?

I have a scenario as below

dataframe1   = loaded from an HDFS Parquet file.

dataframe2 =   read from a Kafka Stream.

If column1 of dataframe1 value in columnX value of dataframe2 , then I need
then I need to replace column1 value of dataframe1.

Else add column1 value of dataframe1 to dataframe2 as a new record.


In a sense need to implement a look up dataframe which is refresh-able.
For more information please check

https://stackoverflow.com/questions/57479581/how-to-do-this-scenario-in-spark-streaming?noredirect=1#comment101437596_57479581


 Let me know if u need more info

Thanks


Re: Sreate temp table or view by using location

2019-08-13 Thread 霍战锋
It works by using "create temp view" and "options(path='something')"
together, thanks.

spark.sql("""create temp view people (name string, age int) using csv
options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true',path='src/main/resources/people.txt')""")

Best Regards


霍战锋  于2019年8月13日周二 下午8:03写道:

> Sorry for the typo. The title is 'Create temp table or view by using
> location'.
> Best Regards
>
>
> 霍战锋  于2019年8月13日周二 下午8:00写道:
>
>> Hi,
>>
>> I'm trying to use spark SQL to define a temp table which can be
>> destroyed automatically with the session.  But when I using the SQL as
>> below, I can't query any valid row, meanwhile, it works when I delete the
>> word 'temp'.  Is there anyone can tell me how to write the right SQL?
>>
>> It doesn't work like this one.
>> spark.sql("create temp view people (name string, age int) using csv 
>> options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true') location 
>> 'src/main/resources/people.txt'")
>>
>> It works like this one, but it's not a temp table.
>> spark.sql("create view people (name string, age int) using csv 
>> options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true') location 
>> 'src/main/resources/people.txt'")
>>
>> As a repetition, I would like to use SQL to get the same result as
>> 'dataset.createOrReplaceTempView(table name)'.
>>
>> Thank you.
>>
>> Best Regards
>>
>


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Sean Owen
Seems fine to me if there are enough valuable fixes to justify another
release. If there are any other important fixes imminent, it's fine to
wait for those.


On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun  wrote:
>
> Hi, All.
>
> Spark 2.4.3 was released three months ago (8th May).
> As of today (13th August), there are 112 commits (75 JIRAs) in `branch-24` 
> since 2.4.3.
>
> It would be great if we can have Spark 2.4.4.
> Shall we start `2.4.4 RC1` next Monday (19th August)?
>
> Last time, there was a request for K8s issue and now I'm waiting for 
> SPARK-27900.
> Please let me know if there is another issue.
>
> Thanks,
> Dongjoon.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Release Apache Spark 2.4.4

2019-08-13 Thread Hyukjin Kwon
+1

2019년 8월 14일 (수) 오전 9:13, Takeshi Yamamuro 님이 작성:

> Hi,
>
> Thanks for your notification, Dongjoon!
> I put some links for the other committers/PMCs to access the info easily:
>
> A commit list in github from the last release:
> https://github.com/apache/spark/compare/5ac2014e6c118fbeb1fe8e5c8064c4a8ee9d182a...branch-2.4
> A issue list in jira:
> https://issues.apache.org/jira/projects/SPARK/versions/12345466#release-report-tab-body
> The 5 correctness issues resolved in branch-2.4:
>
> https://issues.apache.org/jira/browse/SPARK-27798?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012345466%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC
>
> Anyway, +1
>
> Best,
> Takeshi
>
> On Wed, Aug 14, 2019 at 8:25 AM DB Tsai  wrote:
>
>> +1
>>
>> On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun 
>> wrote:
>> >
>> > Hi, All.
>> >
>> > Spark 2.4.3 was released three months ago (8th May).
>> > As of today (13th August), there are 112 commits (75 JIRAs) in
>> `branch-24` since 2.4.3.
>> >
>> > It would be great if we can have Spark 2.4.4.
>> > Shall we start `2.4.4 RC1` next Monday (19th August)?
>> >
>> > Last time, there was a request for K8s issue and now I'm waiting for
>> SPARK-27900.
>> > Please let me know if there is another issue.
>> >
>> > Thanks,
>> > Dongjoon.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Takeshi Yamamuro
Hi,

Thanks for your notification, Dongjoon!
I put some links for the other committers/PMCs to access the info easily:

A commit list in github from the last release:
https://github.com/apache/spark/compare/5ac2014e6c118fbeb1fe8e5c8064c4a8ee9d182a...branch-2.4
A issue list in jira:
https://issues.apache.org/jira/projects/SPARK/versions/12345466#release-report-tab-body
The 5 correctness issues resolved in branch-2.4:
https://issues.apache.org/jira/browse/SPARK-27798?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012345466%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC

Anyway, +1

Best,
Takeshi

On Wed, Aug 14, 2019 at 8:25 AM DB Tsai  wrote:

> +1
>
> On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun 
> wrote:
> >
> > Hi, All.
> >
> > Spark 2.4.3 was released three months ago (8th May).
> > As of today (13th August), there are 112 commits (75 JIRAs) in
> `branch-24` since 2.4.3.
> >
> > It would be great if we can have Spark 2.4.4.
> > Shall we start `2.4.4 RC1` next Monday (19th August)?
> >
> > Last time, there was a request for K8s issue and now I'm waiting for
> SPARK-27900.
> > Please let me know if there is another issue.
> >
> > Thanks,
> > Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


Re: Release Apache Spark 2.4.4

2019-08-13 Thread DB Tsai
+1

On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun  wrote:
>
> Hi, All.
>
> Spark 2.4.3 was released three months ago (8th May).
> As of today (13th August), there are 112 commits (75 JIRAs) in `branch-24` 
> since 2.4.3.
>
> It would be great if we can have Spark 2.4.4.
> Shall we start `2.4.4 RC1` next Monday (19th August)?
>
> Last time, there was a request for K8s issue and now I'm waiting for 
> SPARK-27900.
> Please let me know if there is another issue.
>
> Thanks,
> Dongjoon.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Release Apache Spark 2.4.4

2019-08-13 Thread Dongjoon Hyun
Hi, All.

Spark 2.4.3 was released three months ago (8th May).
As of today (13th August), there are 112 commits (75 JIRAs) in `branch-24`
since 2.4.3.

It would be great if we can have Spark 2.4.4.
Shall we start `2.4.4 RC1` next Monday (19th August)?

Last time, there was a request for K8s issue and now I'm waiting for
SPARK-27900.
Please let me know if there is another issue.

Thanks,
Dongjoon.


Re: Custom aggregations: modular and lightweight solutions?

2019-08-13 Thread Andrew Leverentz
Here's a simpler example that I think gets at the heart of what I'm trying
to do: DynamicSchemaExample.scala
.  Here,
I'm dynamically creating a sequence of Rows and also dynamically creating a
corresponding schema (StructType), but the RowEncoder derived from the
schema doesn't seem to handle the nested structure of the Rows.  This
example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is
not a valid external type for schema of struct<_1:double,_2:double>").

If I could find a way to get this example working (for arbitrary values of
rowSize), I suspect that it would also give me a solution to the
custom-aggregation issue I outlined in my previous email.  Any suggestions
would be much appreciated.

Thanks,
~ Andrew



On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz <
andrew.levere...@empiricotx.com> wrote:

> Hi All,
>
> I'm attempting to clean up some Spark code which performs groupByKey /
> mapGroups to compute custom aggregations, and I could use some help
> understanding the Spark API's necessary to make my code more modular and
> maintainable.
>
> In particular, my current approach is as follows:
>
>- Start with a Dataset[CaseClass1]
>- Apply groupByKey(f), where f is a function that extracts a tuple of
>keys
>- Apply mapGroups(g), where g computes multiple custom aggregations:
>   - Iterate through the rows in each group, updating a large, mutable
>   CustomState object.
>   - At the end of each group, transform the current key and the
>   CustomState into an instance of CaseClass2.
>
> In other words, we start with a dataset of CaseClass1 objects and end up
> with a dataset of CaseClass2 objects, using instances of a complex
> CustomState class to store the intermediate state during the aggregation.
>
> We have dozens of custom aggregation calculations to perform on this data,
> and I'd like to be able streamline the process of introducing new
> aggregations and comparing multiple parameterized variations of the same
> aggregations side-by-side.  The current approach requires us to touch
> several tightly coupled pieces of code in order to add simple variations to
> existing aggregate functions.
>
> The UDAF API seems to be designed for this use case, but I've found it to
> be just as cumbersome to create new UDAF's as it is to maintain my current
> code.
>
> To address this, I've tried a couple of approaches (described below),
> although I've run into problems with both of them.
>
> At a high level, both of my approaches require a Dataset[T], a key
> extractor function (T => K), and a collection of instances of a custom
> class GroupingCalculation[T, S, R].  Here, T is the data type of each row
> in the dataset, K is the type of the key by which the rows should be
> grouped, S is the type of the intermediate state during aggregation, and R
> is the result type of each aggregation.  In this context, the data types T
> and K are fixed, but the state and result types (S and R) may vary among
> the GroupingCalculation instances.  The resulting DataFrame will have Rows
> which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ...,
> Rn are the result types for the GroupingCollection instances.
>
> (1) My first approach operates by constructing a
> UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to
> T, S, and R.  After some digging and experimentation, I found a way to use
> CatalystTypeConverters and ExpressionEncoders to populate the
> MutableAggregationBuffer.  Unfortunately, once I finally got it running,
> this approach yielded a runtime 10x slower than the original approach
> described above. I suspect that adding an extra encoding/decoding layer on
> top of the UDAF was what slowed it down.  Because of this, I'm setting
> aside this approach for now.
>
> (2) Using a similar API to (1), I replaced the implementation with one
> that uses groupByKey and mapGroups.  This bypasses the need for creating a
> wrapper around UDAF.  Also, the internal state, rather than being encoded
> in a DataFrame, is simply stored in one mutable ArrayBuffer[Any] per
> group.  An implementation of this approach is available here:
> https://gist.github.com/alev000/27d10a402ad250957b792091084932f4
> I feel that this implementation is promising, but I haven't been able to
> get some of my test cases in the above Gist to pass.  In particular, my
> test cases "Test grouping calculations with various combinations of case
> classes" and "Test firstAndOnly" fail with the following runtime error
> messages, respectively:
>
>- "TestCase3 is not a valid external type for schema of
>struct"
>- "scala.Some is not a valid external type for schema of string"
>
> Would anyone be able to help me diagnose the runtime errors with approach
> (2), or to suggest a better alternative?
>
> Thanks,
> ~ Andrew
>


Spark Streaming concurrent calls

2019-08-13 Thread Amit Sharma
I am using kafka spark streming. My UI application send request to
streaming through kafka. Problem is streaming handles one request at a time
so if multiple users send request at the same time they have to wait till
earlier request are done.
Is there any way it can handle multiple request.


Thanks
Amit


help understanding physical plan

2019-08-13 Thread Marcelo Valle
Hi,

I have a job running on AWS EMR. It's basically a join between 2 tables
(parquet files on s3), one somehow large (around 50 gb) and other small
(less than 1gb).
The small table is the result of other operations, but it was a dataframe
with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. While
most of my jobs on larges amount of data take max 1 h on this cluster, this
one takes almost 1 day to complete.

What could I be doing wrong? I am trying to analyze the plan, but I can't
find anything that justify the slowness. It has 2 shuffles followed by a
zip, but other jobs have similar things and they are not that slow.

Could anyone point me to possible actions I could take to investigate this?

Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
&& (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   : +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, ISRC#1494,
ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
   :   +- *(2) Project [ID#328 AS
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
USAGE_AGGREGATED_METADATA_HASH#1513]
   :  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
(track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
artist_name_1#1422)) && (work_writer_names#293 <=>
work_writer_names_1#1423))
   : :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
work_writer_names_1#1423]
   : : +- *(1) Filter isnotnull(ID#328)
   : :+- InMemoryTableScan [ID#328,
artist_name#292, isrc#289, iswc#290, track_name#291,
work_writer_names#293], [isnotnull(ID#328)]
   : :  +- InMemoryRelation [ID#328, isrc#289,
iswc#290, track_name#291, artist_name#292, work_writer_names#293],
StorageLevel(disk, memory, 1 replicas)
   : :+- *(2) Project [ID#328,
isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
   : :   +- *(2) BroadcastHashJoin
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(substring(artist_name#292, 0, 1000), ),
coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
<=> ISRC#329) && (iswc#290 <=> ISWC#330)) && (track_name#291 <=>
RECORDING_TITLE#331)) && (substring(artist_name#292, 0, 1000) <=>
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=>
WORK_WRITERS#333))
   : :  :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  :  +- *(1) Project [ID#328,
ISRC#329, ISWC#330, 

Re: Sreate temp table or view by using location

2019-08-13 Thread 霍战锋
Sorry for the typo. The title is 'Create temp table or view by using
location'.
Best Regards


霍战锋  于2019年8月13日周二 下午8:00写道:

> Hi,
>
> I'm trying to use spark SQL to define a temp table which can be
> destroyed automatically with the session.  But when I using the SQL as
> below, I can't query any valid row, meanwhile, it works when I delete the
> word 'temp'.  Is there anyone can tell me how to write the right SQL?
>
> It doesn't work like this one.
> spark.sql("create temp view people (name string, age int) using csv 
> options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true') location 
> 'src/main/resources/people.txt'")
>
> It works like this one, but it's not a temp table.
> spark.sql("create view people (name string, age int) using csv 
> options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true') location 
> 'src/main/resources/people.txt'")
>
> As a repetition, I would like to use SQL to get the same result as
> 'dataset.createOrReplaceTempView(table name)'.
>
> Thank you.
>
> Best Regards
>


Sreate temp table or view by using location

2019-08-13 Thread 霍战锋
Hi,

I'm trying to use spark SQL to define a temp table which can be
destroyed automatically with the session.  But when I using the SQL as
below, I can't query any valid row, meanwhile, it works when I delete the
word 'temp'.  Is there anyone can tell me how to write the right SQL?

It doesn't work like this one.
spark.sql("create temp view people (name string, age int) using csv
options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true')
location 'src/main/resources/people.txt'")

It works like this one, but it's not a temp table.
spark.sql("create view people (name string, age int) using csv
options(sep=',',inferSchema='true',ignoreLeadingWhiteSpace='true')
location 'src/main/resources/people.txt'")

As a repetition, I would like to use SQL to get the same result as
'dataset.createOrReplaceTempView(table
name)'.

Thank you.

Best Regards


how to specify which partition each record send on spark structured streaming kafka sink?

2019-08-13 Thread zenglong chen
Key option is not work!


Re: Spark streaming dataframe extract message to new columns

2019-08-13 Thread Tianlang

Hi,

Do you mean haven a colum A then you want to extract A1 and A2 from A ?

like

Cloumn A value   123456,2019-08-07

A1 value is 123456

A2 value is 2019-08-07

If that's the case you can use df.select like this

df.select(split('A)(0) as "A1", split('A)(1) as "A2")


Good Luck

在 2019/8/12 下午8:51, Gourav Sengupta 写道:

Hi,

I think that it should be possible to write a query on the streaming 
data frame and then write the output of the query to S3 or any other 
sink layer.


Regards,
Gourav Sengupta

On Sat, Aug 10, 2019 at 9:24 AM zenglong chen > wrote:


How to extract some message in streaming dataframe and make a new
column?
Thanks.


--

TianlangStudio 

Some of the biggest lies: I will start tomorrow/Others are better than 
me/I am not good enough/I don't have time/This is the way I am