janithkv commented on a change in pull request #2911: STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout URL: https://github.com/apache/storm/pull/2911#discussion_r283243069
########## File path: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java ########## @@ -303,4 +315,38 @@ public void testLatestStrategyWhenTopologyIsRedeployed() { verify(collectorMock, never()).emit(anyList()); } + @Test + public void testTimeStampStrategyWhenTopologyIsRedeployed() { + /** + * TIMESTAMP strategy should be applied if the emitter is new and the topology has been redeployed (storm id has changed) + * Offset should be reset according to the offset corresponding to startTimeStamp + */ + long preRestartEmittedOffset = 20; + int preRestartEmittedRecords = 10; + long timeStampStartOffset = 2L; + long pollTimeout = 1L; + KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology"); + + Consumer kafkaConsumer = Mockito.mock(Consumer.class); Review comment: Aha. thanks for the explanation. That does make sense ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services