leilinen opened a new issue #615: how change HoodieDeltaStreamer  with  
Real-time calculation
URL: https://github.com/apache/incubator-hudi/issues/615
 
 
   
   
   Hi, In my project, I want to  consume data from kafka topic and upsert data 
with hoodie. In hoodie, the HoodieDeltaStreamer is drived by spark context, 
which is a kind of batch program, it will exit after calculate one batch data. 
So I use **spark streaming context** to do that. the HoodieDeltaStreamer uses 
RDDs , but spark streaming context got DStream object from kafka topic with 
those code 
   
   ```
   private JavaDStream<GenericRecord> toDStream(OffsetRange[] offsetRanges) {
       String topicName = props.getString(KAFKA_TOPIC_NAME);
       JavaDStream<GenericRecord> recordStream = null;
   
       recordStream =  KafkaUtils.createDirectStream(javaStreamingContext, 
String.class, Object.class, StringDecoder.class, HoodieKafkaAvroDecoder.class,
               offsetGen.getKafkaParams(), new 
HashSet<>(Arrays.asList(topicName))).map(obj -> (GenericRecord) obj);
       return recordStream;
     }
   
   ```
   recordStream is a JavaDStream<GenericRecord> object, the HoodieDeltaStreamer 
needs a JavaRDD<GenericRecord> object , i have to convert 
JavaDStream<GenericRecord> into  JavaRDD<GenericRecord>.I refernece JavaDStream 
API, the compute(Time duration) method can **Generate an RDD for the given 
duration** , However, when my program executed to this step, 
   
   ```
   JavaDStream<GenericRecord> newDataStream = toDStream(offsetRanges);
   if (newDataStream == null) {
      throw new HoodieException("Cannot fetch new stream data!");
   }
   log.info("fetch new stream data");
   newDataStream.print();
   newDataRDD = newDataStream.compute(new Time(3000L));
   
   ```
   
   I got a exception
   
   ```
   org.apache.spark.SparkException: 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@62065d12 has not been 
initialized
           at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312)
           at 
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
           at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
           at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
           at scala.Option.orElse(Option.scala:289)
           at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
           at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
           at 
org.apache.spark.streaming.api.java.JavaDStream.compute(JavaDStream.scala:58)
           at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.createContext(HoodieDeltaStreamer.java:406)
           at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.lambda$new$b912b50a$1(HoodieDeltaStreamer.java:193)
           at 
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
           at 
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
           at scala.Option.getOrElse(Option.scala:121)
   ```
   
   because the zerotime value of DStream is always null, the compute() method 
will check if time valid or not.
   
   
   if I change HoodieDeltaStreamer with **flink**, I have created a  flink 
source to consume data from kafka topic. I donot know how to create flink sink 
with hoodie write client. 
   
   I have no idea about that, can you give some suggestions for me. thank you !

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to