bossenti opened a new issue, #1676: URL: https://github.com/apache/streampipes/issues/1676
### Discussed in https://github.com/apache/streampipes/discussions/1626 <div type='discussions-op-text'> <sup>Originally posted by **luoluoyuyu** May 27, 2023</sup> ### Problem Description SpKafkaConsumer's process of creating consumers is an asynchronous process. If the connected topic has not submitted data, the data produced by the producer to the topic will be lost in the process of the consumer connecting to the topic. (The situation that creates the problem can be found in https://github.com/apache/streampipes/blob/dev/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java#LL28C1-L28C1) ``` public class AdaptersTest { public void testAdapter(AdapterTesterBase adapterTester) throws Exception { adapterTester.startAdapterService(); AdapterDescription adapterDescription = adapterTester.prepareAdapter(); adapterTester.startAdapter(adapterDescription); List<Map<String, Object>> data = adapterTester.generateData(); adapterTester.validateData(data); } } ``` ### Reason In AdaptersTest, for example, consumers and producers are created asynchronously. When the created topic has not been submitted by a consumer for the same consumer group, the default behavior of the consumer in this case is to consume only the newly generated data after the consumer connects to the topic. When the producer produces a message during the consumer's connection to the topic, the consumer also loses the corresponding data. ### Solutions Add the corresponding configuration items, for the same consumer group, if no offset has been submitted, then start consuming from the beginning ``` public class ConsumerConfigFactory extends AbstractConfigFactory { private static final String AUTO_OFFSET_RESET_CONFIG_DEFAULT="earliest"; @Override public Properties makeDefaultProperties() { Properties props = new Properties(); //..... props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET_RESET_CONFIG_DEFAULT); return props; } } ``` </div> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
