Hi, I know it, but my purpose it to transforming json string in DataSet<String> to Dataset<Row>, while spark.readStream can only support read json file in specified path. https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming gives an essential method, but the formats of every json data are not same. Either Spark java api seems not supporting grammer like
.select(from_json($"value", colourSchema)) Regard, Junfeng Chen On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Have you read through the documentation of Structured Streaming? > https://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html > > One of the basic mistakes you are making is defining the dataset as with > `spark.read()`. You define a streaming Dataset as `spark.readStream()` > > On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote: > >> Hi, Tathagata >> >> I have tried structured streaming, but in line >> >>> Dataset<Row> rowDataset = spark.read().json(jsondataset); >> >> >> Always throw >> >>> Queries with streaming sources must be executed with writeStream.start() >> >> >> But what i need to do in this step is only transforming json string data >> to Dataset . How to fix it? >> >> Thanks! >> >> >> Regard, >> Junfeng Chen >> >> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> It's not very surprising that doing this sort of RDD to DF conversion >>> inside DStream.foreachRDD has weird corner cases like this. In fact, you >>> are going to have additional problems with partial parquet files (when >>> there are failures) in this approach. I strongly suggest that you use >>> Structured Streaming, which is designed to do this sort of processing. It >>> will take care of tracking the written parquet files correctly. >>> >>> TD >>> >>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> >>> wrote: >>> >>>> I write a program to read some json data from kafka and purpose to save >>>> them to parquet file on hdfs. >>>> Here is my code: >>>> >>>>> JavaInputDstream stream = ... >>>>> JavaDstream rdd = stream.map... >>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> >>>>> stringjavardd->{ >>>>> Dataset<Row> df = spark.read().json( stringjavardd ); // convert >>>>> json to df >>>>> JavaRDD<Row> rowJavaRDD = df.javaRDD().map... //add some new >>>>> fields >>>>> StructType type = df.schema()...; // constuct new type for new >>>>> added fields >>>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); >>>>> //create new dataframe >>>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>>>> tionedBy("appname").parquet(savepath); // save to parquet >>>>> }) >>>> >>>> >>>> >>>> However, if I remove the repartition method of newdf in writing parquet >>>> stage, the program always throw nullpointerexception error in json convert >>>> line: >>>> >>>> Java.lang.NullPointerException >>>>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>>>> scala:1783) >>>>> ... >>>> >>>> >>>> While it looks make no sense, writing parquet operation should be in >>>> different stage with json transforming operation. >>>> So how to solve it? Thanks! >>>> >>>> Regard, >>>> Junfeng Chen >>>> >>> >>> >> >