lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586183046



##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
 
     public void start() {
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source");
+            LOG.info("Starting kafka source on {}", 
kafkaSourceConfig.getTopic());
             
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, byte[]> consumerRecords;
             while (running) {
-                consumerRecords = consumer.poll(1000);
+                ConsumerRecords<Object, Object> consumerRecords = 
consumer.poll(1000);
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, byte[]> consumerRecord : 
consumerRecords) {
-                    LOG.debug("Record received from kafka, key: {}. value: 
{}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, 
extractValue(consumerRecord));
+                for (ConsumerRecord<Object, Object> consumerRecord : 
consumerRecords) {
+                    KafkaRecord record = new KafkaRecord(consumerRecord,

Review comment:
       The reason why I made the comment is that in some projects, there's a 
convention that the Java compiler shouldn't produce warnings about the source 
code. Warnings should be suppressed or eliminated by fixing the code.
   
   The result of ignoring the type parameters is this type of warning:
   ```
   [INFO] 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:
 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 uses unchecked or unsafe operations.
   [INFO] 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:
 Recompile with -Xlint:unchecked for details.
   ```
   
   After adding `<arg>-Xlint:unchecked</arg>` to `compilerArgs` in 
`maven-compiler-plugin`'s configuration, it produces this type of detailed 
warnings:
   ```
   [WARNING] 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[130,42]
 unchecked call to 
KafkaRecord(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String,?>,java.util.Optional<java.lang.String>,V,org.apache.pulsar.client.api.Schema<V>)
 as a member of the raw type 
org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   [WARNING] 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,28]
 unchecked method invocation: method consume in class 
org.apache.pulsar.io.core.PushSource is applied to given types
     required: org.apache.pulsar.functions.api.Record<T>
     found: org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   [WARNING] 
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,29]
 unchecked conversion
     required: org.apache.pulsar.functions.api.Record<T>
     found:    org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
   ```
   
   Shouldn't these warnings be handled?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to