eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585639918



##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", 
kafkaSourceConfig.getTopic());
             
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = 
consumer.poll(1000);
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : 
consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: 
{}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, 
extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : 
consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       @lhotari 
   adding the parameter helps a little bit inside the KafkaRecord in order to 
check at compile time a few relations between the moving parts.
   Here I cannot use the parameter because actually I do not know the final 
binding, as it depends on the configuration of the Consumer 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to