Have you read through the documentation of Structured Streaming? https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
One of the basic mistakes you are making is defining the dataset as with `spark.read()`. You define a streaming Dataset as `spark.readStream()` On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote: > Hi, Tathagata > > I have tried structured streaming, but in line > >> Dataset<Row> rowDataset = spark.read().json(jsondataset); > > > Always throw > >> Queries with streaming sources must be executed with writeStream.start() > > > But what i need to do in this step is only transforming json string data > to Dataset . How to fix it? > > Thanks! > > > Regard, > Junfeng Chen > > On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> It's not very surprising that doing this sort of RDD to DF conversion >> inside DStream.foreachRDD has weird corner cases like this. In fact, you >> are going to have additional problems with partial parquet files (when >> there are failures) in this approach. I strongly suggest that you use >> Structured Streaming, which is designed to do this sort of processing. It >> will take care of tracking the written parquet files correctly. >> >> TD >> >> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote: >> >>> I write a program to read some json data from kafka and purpose to save >>> them to parquet file on hdfs. >>> Here is my code: >>> >>>> JavaInputDstream stream = ... >>>> JavaDstream rdd = stream.map... >>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> >>>> stringjavardd->{ >>>> Dataset<Row> df = spark.read().json( stringjavardd ); // convert >>>> json to df >>>> JavaRDD<Row> rowJavaRDD = df.javaRDD().map... //add some new fields >>>> StructType type = df.schema()...; // constuct new type for new >>>> added fields >>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); >>>> //create new dataframe >>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>>> tionedBy("appname").parquet(savepath); // save to parquet >>>> }) >>> >>> >>> >>> However, if I remove the repartition method of newdf in writing parquet >>> stage, the program always throw nullpointerexception error in json convert >>> line: >>> >>> Java.lang.NullPointerException >>>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>>> scala:1783) >>>> ... >>> >>> >>> While it looks make no sense, writing parquet operation should be in >>> different stage with json transforming operation. >>> So how to solve it? Thanks! >>> >>> Regard, >>> Junfeng Chen >>> >> >> >