Have you read through the documentation of Structured Streaming?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

One of the basic mistakes you are making is defining the dataset as with
`spark.read()`. You define a streaming Dataset as `spark.readStream()`

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote:

> Hi, Tathagata
>
> I have tried structured streaming, but in line
>
>> Dataset<Row> rowDataset = spark.read().json(jsondataset);
>
>
> Always throw
>
>> Queries with streaming sources must be executed with writeStream.start()
>
>
> But what i need to do in this step is only transforming json string data
> to Dataset . How to fix it?
>
> Thanks!
>
>
> Regard,
> Junfeng Chen
>
> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> It's not very surprising that doing this sort of RDD to DF conversion
>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>> are going to have additional problems with partial parquet files (when
>> there are failures) in this approach. I strongly suggest that you use
>> Structured Streaming, which is designed to do this sort of processing. It
>> will take care of tracking the written parquet files correctly.
>>
>> TD
>>
>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote:
>>
>>> I write a program to read some json data from kafka and purpose to save
>>> them to parquet file on hdfs.
>>> Here is my code:
>>>
>>>> JavaInputDstream stream = ...
>>>> JavaDstream rdd = stream.map...
>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
>>>> stringjavardd->{
>>>>     Dataset<Row> df = spark.read().json( stringjavardd ); // convert
>>>> json to df
>>>>     JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
>>>>     StructType type = df.schema()...; // constuct new type for new
>>>> added fields
>>>>     Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>>> //create new dataframe
>>>>     newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>>> tionedBy("appname").parquet(savepath); // save to parquet
>>>> })
>>>
>>>
>>>
>>> However, if I remove the repartition method of newdf in writing parquet
>>> stage, the program always throw nullpointerexception error in json convert
>>> line:
>>>
>>> Java.lang.NullPointerException
>>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>>> scala:1783)
>>>> ...
>>>
>>>
>>> While it looks make no sense, writing parquet operation should be in
>>> different stage with json transforming operation.
>>> So how to solve it? Thanks!
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>

Reply via email to