Hi,
I know it, but my purpose it to transforming json string in DataSet<String>
to Dataset<Row>, while spark.readStream can only support read json file in
specified path.
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
gives an essential method, but the formats of every json data are not same.
Either Spark java api seems not supporting grammer like

.select(from_json($"value", colourSchema))



Regard,
Junfeng Chen

On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Have you read through the documentation of Structured Streaming?
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
> One of the basic mistakes you are making is defining the dataset as with
> `spark.read()`. You define a streaming Dataset as `spark.readStream()`
>
> On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote:
>
>> Hi, Tathagata
>>
>> I have tried structured streaming, but in line
>>
>>> Dataset<Row> rowDataset = spark.read().json(jsondataset);
>>
>>
>> Always throw
>>
>>> Queries with streaming sources must be executed with writeStream.start()
>>
>>
>> But what i need to do in this step is only transforming json string data
>> to Dataset . How to fix it?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> It's not very surprising that doing this sort of RDD to DF conversion
>>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>>> are going to have additional problems with partial parquet files (when
>>> there are failures) in this approach. I strongly suggest that you use
>>> Structured Streaming, which is designed to do this sort of processing. It
>>> will take care of tracking the written parquet files correctly.
>>>
>>> TD
>>>
>>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com>
>>> wrote:
>>>
>>>> I write a program to read some json data from kafka and purpose to save
>>>> them to parquet file on hdfs.
>>>> Here is my code:
>>>>
>>>>> JavaInputDstream stream = ...
>>>>> JavaDstream rdd = stream.map...
>>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
>>>>> stringjavardd->{
>>>>>     Dataset<Row> df = spark.read().json( stringjavardd ); // convert
>>>>> json to df
>>>>>     JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new
>>>>> fields
>>>>>     StructType type = df.schema()...; // constuct new type for new
>>>>> added fields
>>>>>     Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>>>> //create new dataframe
>>>>>     newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>>>> tionedBy("appname").parquet(savepath); // save to parquet
>>>>> })
>>>>
>>>>
>>>>
>>>> However, if I remove the repartition method of newdf in writing parquet
>>>> stage, the program always throw nullpointerexception error in json convert
>>>> line:
>>>>
>>>> Java.lang.NullPointerException
>>>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>>>> scala:1783)
>>>>> ...
>>>>
>>>>
>>>> While it looks make no sense, writing parquet operation should be in
>>>> different stage with json transforming operation.
>>>> So how to solve it? Thanks!
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>
>>>
>>
>

Reply via email to