leilinen opened a new issue #615: how change HoodieDeltaStreamer with Real-time calculation URL: https://github.com/apache/incubator-hudi/issues/615 Hi, In my project, I want to consume data from kafka topic and upsert data with hoodie. In hoodie, the HoodieDeltaStreamer is drived by spark context, which is a kind of batch program, it will exit after calculate one batch data. So I use **spark streaming context** to do that. the HoodieDeltaStreamer uses RDDs , but spark streaming context got DStream object from kafka topic with those code ``` private JavaDStream<GenericRecord> toDStream(OffsetRange[] offsetRanges) { String topicName = props.getString(KAFKA_TOPIC_NAME); JavaDStream<GenericRecord> recordStream = null; recordStream = KafkaUtils.createDirectStream(javaStreamingContext, String.class, Object.class, StringDecoder.class, HoodieKafkaAvroDecoder.class, offsetGen.getKafkaParams(), new HashSet<>(Arrays.asList(topicName))).map(obj -> (GenericRecord) obj); return recordStream; } ``` recordStream is a JavaDStream<GenericRecord> object, the HoodieDeltaStreamer needs a JavaRDD<GenericRecord> object , i have to convert JavaDStream<GenericRecord> into JavaRDD<GenericRecord>.I refernece JavaDStream API, the compute(Time duration) method can **Generate an RDD for the given duration** , However, when my program executed to this step, ``` JavaDStream<GenericRecord> newDataStream = toDStream(offsetRanges); if (newDataStream == null) { throw new HoodieException("Cannot fetch new stream data!"); } log.info("fetch new stream data"); newDataStream.print(); newDataRDD = newDataStream.compute(new Time(3000L)); ``` I got a exception ``` org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@62065d12 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36) at org.apache.spark.streaming.api.java.JavaDStream.compute(JavaDStream.scala:58) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.createContext(HoodieDeltaStreamer.java:406) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.lambda$new$b912b50a$1(HoodieDeltaStreamer.java:193) at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627) at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626) at scala.Option.getOrElse(Option.scala:121) ``` because the zerotime value of DStream is always null, the compute() method will check if time valid or not. if I change HoodieDeltaStreamer with **flink**, I have created a flink source to consume data from kafka topic. I donot know how to create flink sink with hoodie write client. I have no idea about that, can you give some suggestions for me. thank you !
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services