It's not very surprising that doing this sort of RDD to DF conversion
inside DStream.foreachRDD has weird corner cases like this. In fact, you
are going to have additional problems with partial parquet files (when
there are failures) in this approach. I strongly suggest that you use
Structured Streaming, which is designed to do this sort of processing. It
will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote:

> I write a program to read some json data from kafka and purpose to save
> them to parquet file on hdfs.
> Here is my code:
>
>> JavaInputDstream stream = ...
>> JavaDstream rdd = stream.map...
>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
>> stringjavardd->{
>>     Dataset<Row> df = spark.read().json( stringjavardd ); // convert
>> json to df
>>     JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
>>     StructType type = df.schema()...; // constuct new type for new added
>> fields
>>     Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>> //create new dataframe
>>     newdf.repatition(taskNum).write().mode(SaveMode.Append).
>> patitionedBy("appname").parquet(savepath); // save to parquet
>> })
>
>
>
> However, if I remove the repartition method of newdf in writing parquet
> stage, the program always throw nullpointerexception error in json convert
> line:
>
> Java.lang.NullPointerException
>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>> scala:1783)
>> ...
>
>
> While it looks make no sense, writing parquet operation should be in
> different stage with json transforming operation.
> So how to solve it? Thanks!
>
> Regard,
> Junfeng Chen
>

Reply via email to