bossenti opened a new issue, #1676:
URL: https://github.com/apache/streampipes/issues/1676

   ### Discussed in https://github.com/apache/streampipes/discussions/1626
   
   <div type='discussions-op-text'>
   
   <sup>Originally posted by **luoluoyuyu** May 27, 2023</sup>
   ### Problem Description
   SpKafkaConsumer's process of creating consumers is an asynchronous process. 
If the connected topic has not submitted data, the data produced by the 
producer to the topic will be lost in the process of the consumer connecting to 
the topic. (The situation that creates the problem can be found in 
https://github.com/apache/streampipes/blob/dev/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java#LL28C1-L28C1)
   ```
   public class AdaptersTest {
     public void testAdapter(AdapterTesterBase adapterTester) throws Exception {
       adapterTester.startAdapterService();
       AdapterDescription adapterDescription = adapterTester.prepareAdapter();
       adapterTester.startAdapter(adapterDescription);
       List<Map<String, Object>> data = adapterTester.generateData();
       adapterTester.validateData(data);
     }
   }
   ```
   ### Reason
   In AdaptersTest, for example, consumers and producers are created 
asynchronously. When the created topic has not been submitted by a consumer for 
the same consumer group, the default behavior of the consumer in this case is 
to consume only the newly generated data after the consumer connects to the 
topic. When the producer produces a message during the consumer's connection to 
the topic, the consumer also loses the corresponding data.
   
   ### Solutions
   Add the corresponding configuration items, for the same consumer group, if 
no offset has been submitted, then start consuming from the beginning
   
   ```
   public class ConsumerConfigFactory extends AbstractConfigFactory {
       private static final String AUTO_OFFSET_RESET_CONFIG_DEFAULT="earliest";
       
       @Override
     public Properties makeDefaultProperties() {
       Properties props = new Properties();
       //.....
        
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET_RESET_CONFIG_DEFAULT);
       return props;
     }
   }
   ```
   
   
   
   </div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to