Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi,
I know it, but my purpose it to transforming json string in DataSet
to Dataset, while spark.readStream can only support read json file in
specified path.
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
gives an essential method, but the formats of every json data are not same.
Either Spark java api seems not supporting grammer like

.select(from_json($"value", colourSchema))



Regard,
Junfeng Chen

On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das 
wrote:

> Have you read through the documentation of Structured Streaming?
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
> One of the basic mistakes you are making is defining the dataset as with
> `spark.read()`. You define a streaming Dataset as `spark.readStream()`
>
> On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen  wrote:
>
>> Hi, Tathagata
>>
>> I have tried structured streaming, but in line
>>
>>> Dataset rowDataset = spark.read().json(jsondataset);
>>
>>
>> Always throw
>>
>>> Queries with streaming sources must be executed with writeStream.start()
>>
>>
>> But what i need to do in this step is only transforming json string data
>> to Dataset . How to fix it?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> It's not very surprising that doing this sort of RDD to DF conversion
>>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>>> are going to have additional problems with partial parquet files (when
>>> there are failures) in this approach. I strongly suggest that you use
>>> Structured Streaming, which is designed to do this sort of processing. It
>>> will take care of tracking the written parquet files correctly.
>>>
>>> TD
>>>
>>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen 
>>> wrote:
>>>
 I write a program to read some json data from kafka and purpose to save
 them to parquet file on hdfs.
 Here is my code:

> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction stringjavardd->{
> Dataset df = spark.read().json( stringjavardd ); // convert
> json to df
> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new
> fields
> StructType type = df.schema()...; // constuct new type for new
> added fields
> Dataset //create new dataframe
> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
> tionedBy("appname").parquet(savepath); // save to parquet
> })



 However, if I remove the repartition method of newdf in writing parquet
 stage, the program always throw nullpointerexception error in json convert
 line:

 Java.lang.NullPointerException
>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
> scala:1783)
> ...


 While it looks make no sense, writing parquet operation should be in
 different stage with json transforming operation.
 So how to solve it? Thanks!

 Regard,
 Junfeng Chen

>>>
>>>
>>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
Have you read through the documentation of Structured Streaming?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

One of the basic mistakes you are making is defining the dataset as with
`spark.read()`. You define a streaming Dataset as `spark.readStream()`

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen  wrote:

> Hi, Tathagata
>
> I have tried structured streaming, but in line
>
>> Dataset rowDataset = spark.read().json(jsondataset);
>
>
> Always throw
>
>> Queries with streaming sources must be executed with writeStream.start()
>
>
> But what i need to do in this step is only transforming json string data
> to Dataset . How to fix it?
>
> Thanks!
>
>
> Regard,
> Junfeng Chen
>
> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> It's not very surprising that doing this sort of RDD to DF conversion
>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>> are going to have additional problems with partial parquet files (when
>> there are failures) in this approach. I strongly suggest that you use
>> Structured Streaming, which is designed to do this sort of processing. It
>> will take care of tracking the written parquet files correctly.
>>
>> TD
>>
>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:
>>
>>> I write a program to read some json data from kafka and purpose to save
>>> them to parquet file on hdfs.
>>> Here is my code:
>>>
 JavaInputDstream stream = ...
 JavaDstream rdd = stream.map...
 rdd.repartition(taksNum).foreachRDD(VoldFunction{
 Dataset df = spark.read().json( stringjavardd ); // convert
 json to df
 JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
 StructType type = df.schema()...; // constuct new type for new
 added fields
 Dataset>>
>>>
>>>
>>> However, if I remove the repartition method of newdf in writing parquet
>>> stage, the program always throw nullpointerexception error in json convert
>>> line:
>>>
>>> Java.lang.NullPointerException
  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
 scala:1783)
 ...
>>>
>>>
>>> While it looks make no sense, writing parquet operation should be in
>>> different stage with json transforming operation.
>>> So how to solve it? Thanks!
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi, Tathagata

I have tried structured streaming, but in line

> Dataset rowDataset = spark.read().json(jsondataset);


Always throw

> Queries with streaming sources must be executed with writeStream.start()


But what i need to do in this step is only transforming json string data to
Dataset . How to fix it?

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das 
wrote:

> It's not very surprising that doing this sort of RDD to DF conversion
> inside DStream.foreachRDD has weird corner cases like this. In fact, you
> are going to have additional problems with partial parquet files (when
> there are failures) in this approach. I strongly suggest that you use
> Structured Streaming, which is designed to do this sort of processing. It
> will take care of tracking the written parquet files correctly.
>
> TD
>
> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:
>
>> I write a program to read some json data from kafka and purpose to save
>> them to parquet file on hdfs.
>> Here is my code:
>>
>>> JavaInputDstream stream = ...
>>> JavaDstream rdd = stream.map...
>>> rdd.repartition(taksNum).foreachRDD(VoldFunction>> stringjavardd->{
>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>> json to df
>>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>>> StructType type = df.schema()...; // constuct new type for new added
>>> fields
>>> Dataset>> //create new dataframe
>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>> tionedBy("appname").parquet(savepath); // save to parquet
>>> })
>>
>>
>>
>> However, if I remove the repartition method of newdf in writing parquet
>> stage, the program always throw nullpointerexception error in json convert
>> line:
>>
>> Java.lang.NullPointerException
>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>> scala:1783)
>>> ...
>>
>>
>> While it looks make no sense, writing parquet operation should be in
>> different stage with json transforming operation.
>> So how to solve it? Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion
inside DStream.foreachRDD has weird corner cases like this. In fact, you
are going to have additional problems with partial parquet files (when
there are failures) in this approach. I strongly suggest that you use
Structured Streaming, which is designed to do this sort of processing. It
will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:

> I write a program to read some json data from kafka and purpose to save
> them to parquet file on hdfs.
> Here is my code:
>
>> JavaInputDstream stream = ...
>> JavaDstream rdd = stream.map...
>> rdd.repartition(taksNum).foreachRDD(VoldFunction> stringjavardd->{
>> Dataset df = spark.read().json( stringjavardd ); // convert
>> json to df
>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>> StructType type = df.schema()...; // constuct new type for new added
>> fields
>> Dataset> //create new dataframe
>> newdf.repatition(taskNum).write().mode(SaveMode.Append).
>> patitionedBy("appname").parquet(savepath); // save to parquet
>> })
>
>
>
> However, if I remove the repartition method of newdf in writing parquet
> stage, the program always throw nullpointerexception error in json convert
> line:
>
> Java.lang.NullPointerException
>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>> scala:1783)
>> ...
>
>
> While it looks make no sense, writing parquet operation should be in
> different stage with json transforming operation.
> So how to solve it? Thanks!
>
> Regard,
> Junfeng Chen
>