quitozang opened a new issue #2274:
URL: https://github.com/apache/hudi/issues/2274


   **The RDD in AvroKafkaSource.java is not persisted**
   
   **Describe the problem you faced**
   
     ```
   @Override
     protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
       OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
       long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
       LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
       if (totalNewMsgs <= 0) {
         return new InputBatch<>(Option.empty(), 
CheckpointUtils.offsetsToStr(offsetRanges));
       }
       JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
       return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
     }
   
     private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
       return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges,
               LocationStrategies.PreferConsistent()).map(obj -> 
(GenericRecord) obj.value());
     }
   ```
   This is part of the code in 'AvrokafkaSource.java' of DeltaStreamer module. 
The 'newDataRDD' is not persisted, however it will be used at line 344  and at 
line 377 in DeltaSync.java and at line which will cause repetitive computation.
   My source data is stored in kafka, 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. send test data to kafka topic
   2. Start the Deltastreamer task
   
   **Expected behavior**
   
   The 'KafkaUtils.createRDD' method in 'AvrokafkaSource.java'  is only 
executed once
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.4
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to