zhztheplayer commented on a change in pull request #1253: [CALCITE-3073] 
Support read from special timestamp in KafkaAdapter
URL: https://github.com/apache/calcite/pull/1253#discussion_r291171091
 
 

 ##########
 File path: 
kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
 ##########
 @@ -16,34 +16,122 @@
  */
 package org.apache.calcite.adapter.kafka;
 
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
 
 /**
  * A mock consumer to test Kafka adapter.
  */
-public class KafkaMockConsumer extends MockConsumer {
+public class KafkaMockConsumer extends MockConsumer<byte[], byte[]> {
+  private String topic = "testTopic";
+
   public KafkaMockConsumer(final OffsetResetStrategy offsetResetStrategy) {
     super(OffsetResetStrategy.EARLIEST);
 
-    assign(Arrays.asList(new TopicPartition("testtopic", 0)));
+    updatePartitions(topic,
+        Arrays.asList(new PartitionInfo(topic, 0, Node.noNode(), new Node[0], 
new Node[0])));
 
     HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
-    beginningOffsets.put(new TopicPartition("testtopic", 0), 0L);
+    beginningOffsets.put(new TopicPartition(topic, 0), 0L);
     updateBeginningOffsets(beginningOffsets);
+  }
 
+  /**
+   * Consumer call {@link MockConsumer#rebalance(Collection)} to assign this 
TopicPartition.
+   */
+  public void doRebalance() {
+    rebalance(Arrays.asList(new TopicPartition(topic, 0)));
+  }
+
+  /**
+   * Call {@link MockConsumer#addRecord(ConsumerRecord)} to add some test data,
 
 Review comment:
   Per Calcite's convention it's better to use "Calls" rather than "Call" for 
method/class documentations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to