jia-gao commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r798988796



##########
File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} 
messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and 
the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost 
{@param threshold}
    */
-  public List<ConsumerRecord<String, String>> 
consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, 
int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       In base class IntegrationTestHarness, the only usage of the consumer is 
in setUp() and tearDown(), which create and close the consumer. For tests that 
extend StreamApplicationIntegrationTestHarness the only usage of the consumer 
is here. This means if we don't use the consumer here, there would be no other 
places for them to access the consumer.
   
   I don't think it is good to use the kafkaConsumer map for the base class 
since it could be extended by other classes and they don't have this 
consumeMessage() method that breaks tests. I prefer to only apply the fix to 
where it is needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to