lhotari commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r586183046
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
##########
@@ -118,17 +119,18 @@ public void close() throws InterruptedException {
public void start() {
runnerThread = new Thread(() -> {
- LOG.info("Starting kafka source");
+ LOG.info("Starting kafka source on {}",
kafkaSourceConfig.getTopic());
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
- ConsumerRecords<String, byte[]> consumerRecords;
while (running) {
- consumerRecords = consumer.poll(1000);
+ ConsumerRecords<Object, Object> consumerRecords =
consumer.poll(1000);
CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
int index = 0;
- for (ConsumerRecord<String, byte[]> consumerRecord :
consumerRecords) {
- LOG.debug("Record received from kafka, key: {}. value:
{}", consumerRecord.key(), consumerRecord.value());
- KafkaRecord<V> record = new KafkaRecord<>(consumerRecord,
extractValue(consumerRecord));
+ for (ConsumerRecord<Object, Object> consumerRecord :
consumerRecords) {
+ KafkaRecord record = new KafkaRecord(consumerRecord,
Review comment:
The reason why I made the comment is that in some projects, there's a
convention that the Java compiler shouldn't produce warnings about the source
code. Warnings should be suppressed or eliminated by fixing the code.
The result of ignoring the type parameters is this type of warning:
```
[INFO]
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
uses unchecked or unsafe operations.
[INFO]
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:
Recompile with -Xlint:unchecked for details.
```
After adding `<arg>-Xlint:unchecked</arg>` to `compilerArgs` in
`maven-compiler-plugin`'s configuration, it produces this type of detailed
warnings:
```
[WARNING]
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[130,42]
unchecked call to
KafkaRecord(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String,?>,java.util.Optional<java.lang.String>,V,org.apache.pulsar.client.api.Schema<V>)
as a member of the raw type
org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
[WARNING]
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,28]
unchecked method invocation: method consume in class
org.apache.pulsar.io.core.PushSource is applied to given types
required: org.apache.pulsar.functions.api.Record<T>
found: org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
[WARNING]
/some/path/pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:[134,29]
unchecked conversion
required: org.apache.pulsar.functions.api.Record<T>
found: org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord
```
Shouldn't these warnings be handled?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]