This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 54f0ca1c769 [cleanup][io] Cleanup Kafka connector (#20721)
54f0ca1c769 is described below

commit 54f0ca1c769638000fbeb4d73a565216c0992c32
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Jul 22 21:40:47 2023 +0200

    [cleanup][io] Cleanup Kafka connector (#20721)
---
 .../org/apache/pulsar/functions/api/Record.java    |   2 +-
 .../apache/pulsar/io/kafka/AvroSchemaCache.java    |   4 +-
 .../pulsar/io/kafka/ByteBufferSchemaWrapper.java   |   2 +-
 .../pulsar/io/kafka/BytesWithKafkaSchema.java      |   4 +-
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |   2 +-
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  25 +--
 .../org/apache/pulsar/io/kafka/KafkaBytesSink.java |   2 +-
 .../apache/pulsar/io/kafka/KafkaBytesSource.java   |  36 ++--
 .../apache/pulsar/io/kafka/KafkaSinkConfig.java    |   7 -
 .../apache/pulsar/io/kafka/KafkaSourceConfig.java  |   7 -
 .../apache/pulsar/io/kafka/KafkaStringSource.java  |   6 +-
 .../io/kafka/ByteBufferSchemaWrapperTest.java      |   8 +-
 .../pulsar/io/kafka/KafkaBytesSourceTest.java      | 201 +++++++++++----------
 .../io/kafka/sink/KafkaAbstractSinkTest.java       |   4 +-
 .../io/kafka/source/KafkaAbstractSourceTest.java   |  19 +-
 15 files changed, 158 insertions(+), 171 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index ea6987e5f03..0487b3d02b3 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -27,7 +27,7 @@ import 
org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
 /**
- * Pulsar Connect's Record interface. Record encapsulates the information 
about a record being read from a Source.
+ * Pulsar IO's Record interface. Record encapsulates the information about a 
record being read from a Source.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index f0ad79549eb..4e3abe245be 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -38,9 +38,9 @@ final class AvroSchemaCache {
     private final LoadingCache<Integer, Schema<ByteBuffer>> cache = 
CacheBuilder
             .newBuilder()
             .maximumSize(100)
-            .build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
+            .build(new CacheLoader<>() {
                 @Override
-                public Schema<ByteBuffer> load(Integer schemaId) throws 
Exception {
+                public Schema<ByteBuffer> load(Integer schemaId) {
                     return fetchSchema(schemaId);
                 }
             });
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
index fb4683e511d..aefe62e3857 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 class ByteBufferSchemaWrapper implements Schema<ByteBuffer> {
     private final Supplier<SchemaInfo> original;
 
-    public ByteBufferSchemaWrapper(Schema original) {
+    public ByteBufferSchemaWrapper(Schema<?> original) {
         this(original::getSchemaInfo);
     }
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
index f0429c92eed..585cd927c30 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
@@ -26,6 +26,6 @@ import lombok.Value;
  */
 @Value
 public class BytesWithKafkaSchema {
-    private final ByteBuffer value;
-    private final int schemaId;
+    ByteBuffer value;
+    int schemaId;
 }
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 8fbd6c81861..5ceea4dec8d 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.io.core.SinkContext;
 public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
 
     private Producer<K, V> producer;
-    private Properties props = new Properties();
+    private final Properties props = new Properties();
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 3d4612c039f..f8539518851 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -166,7 +166,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
                     CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                     int index = 0;
                     for (ConsumerRecord<Object, Object> consumerRecord : 
consumerRecords) {
-                        KafkaRecord record = buildRecord(consumerRecord);
+                        KafkaRecord<V> record = buildRecord(consumerRecord);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Write record {} {} {}", 
record.getKey(), record.getValue(), record.getSchema());
                         }
@@ -190,7 +190,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         runnerThread.start();
     }
 
-    public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> 
consumerRecord);
+    public abstract KafkaRecord<V> buildRecord(ConsumerRecord<Object, Object> 
consumerRecord);
 
     protected Map<String, String> copyKafkaHeaders(ConsumerRecord<Object, 
Object> consumerRecord) {
         if (!kafkaSourceConfig.isCopyHeadersEnabled()) {
@@ -208,7 +208,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
 
     @Slf4j
     protected static class KafkaRecord<V> implements Record<V> {
-        private final ConsumerRecord<String, ?> record;
+        private final ConsumerRecord<?, ?> record;
         private final V value;
         private final Schema<V> schema;
         private final Map<String, String> properties;
@@ -216,7 +216,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         @Getter
         private final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
 
-        public KafkaRecord(ConsumerRecord<String, ?> record, V value, 
Schema<V> schema,
+        public KafkaRecord(ConsumerRecord<?, ?> record, V value, Schema<V> 
schema,
                            Map<String, String> properties) {
             this.record = record;
             this.value = value;
@@ -240,7 +240,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
 
         @Override
         public Optional<String> getKey() {
-            return Optional.ofNullable(record.key());
+            return Optional.ofNullable(record.key() instanceof String ? 
(String) record.key() : null);
         }
 
         @Override
@@ -263,13 +263,14 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             return properties;
         }
     }
-    protected static class KeyValueKafkaRecord<V> extends KafkaRecord 
implements KVRecord<Object, Object> {
 
-        private final Schema<Object> keySchema;
-        private final Schema<Object> valueSchema;
+    protected static class KeyValueKafkaRecord<K, W> extends KafkaRecord 
implements KVRecord<K, W> {
 
-        public KeyValueKafkaRecord(ConsumerRecord record, KeyValue value,
-                                   Schema<Object> keySchema, Schema<Object> 
valueSchema,
+        private final Schema<K> keySchema;
+        private final Schema<W> valueSchema;
+
+        public KeyValueKafkaRecord(ConsumerRecord<?, ?> record, KeyValue<K, W> 
value,
+                                   Schema<K> keySchema, Schema<W> valueSchema,
                                    Map<String, String> properties) {
             super(record, value, null, properties);
             this.keySchema = keySchema;
@@ -277,12 +278,12 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         }
 
         @Override
-        public Schema<Object> getKeySchema() {
+        public Schema<K> getKeySchema() {
             return keySchema;
         }
 
         @Override
-        public Schema<Object> getValueSchema() {
+        public Schema<W> getValueSchema() {
             return valueSchema;
         }
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 2a200531b67..0ed23856995 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 
 /**
- * Kafka sink should treats incoming messages as pure bytes. So we don't
+ * Kafka sink should treat incoming messages as pure bytes. So we don't
  * apply schema into it.
  */
 @Connector(
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 4e35d98e0bb..51408f04519 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -70,8 +70,8 @@ import org.apache.pulsar.io.core.annotations.IOType;
 public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
 
     private AvroSchemaCache schemaCache;
-    private Schema keySchema;
-    private Schema valueSchema;
+    private Schema<ByteBuffer> keySchema;
+    private Schema<ByteBuffer> valueSchema;
     private boolean produceKeyValue;
 
     @Override
@@ -93,11 +93,11 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
         }
 
         if (keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
-            // if the Key is a String we can use native Pulsar Key
-            // otherwise we use KeyValue schema
-            // that allows you to set a schema for the Key and a schema for 
the Value.
-            // using SEPARATED encoding the key is saved into the binary key
-            // so it is used for routing and for compaction
+            // If the Key is a String we can use native Pulsar Key.
+            // Otherwise, we use KeyValue schema.
+            // That allows you to set a schema for the Key and a schema for 
the Value.
+            // Using SEPARATED encoding the key is saved into the binary key,
+            // so it is used for routing and for compaction.
             produceKeyValue = true;
         }
 
@@ -114,13 +114,13 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
     }
 
     @Override
-    public KafkaRecord buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
+    public KafkaRecord<ByteBuffer> buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
         if (produceKeyValue) {
-            Object key = extractSimpleValue(consumerRecord.key());
-            Object value = extractSimpleValue(consumerRecord.value());
-            Schema currentKeySchema = 
getSchemaFromObject(consumerRecord.key(), keySchema);
-            Schema currentValueSchema = 
getSchemaFromObject(consumerRecord.value(), valueSchema);
-            return new KeyValueKafkaRecord(consumerRecord,
+            ByteBuffer key = extractSimpleValue(consumerRecord.key());
+            ByteBuffer value = extractSimpleValue(consumerRecord.value());
+            Schema<ByteBuffer> currentKeySchema = 
getSchemaFromObject(consumerRecord.key(), keySchema);
+            Schema<ByteBuffer> currentValueSchema = 
getSchemaFromObject(consumerRecord.value(), valueSchema);
+            return new KeyValueKafkaRecord<ByteBuffer, 
ByteBuffer>(consumerRecord,
                     new KeyValue<>(key, value),
                     currentKeySchema,
                     currentValueSchema,
@@ -128,7 +128,7 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
 
         } else {
             Object value = consumerRecord.value();
-            return new KafkaRecord(consumerRecord,
+            return new KafkaRecord<>(consumerRecord,
                     extractSimpleValue(value),
                     getSchemaFromObject(value, valueSchema),
                     copyKafkaHeaders(consumerRecord));
@@ -152,7 +152,7 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
         }
     }
 
-    private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema 
fallback) {
+    private Schema<ByteBuffer> getSchemaFromObject(Object value, 
Schema<ByteBuffer> fallback) {
         if (value instanceof BytesWithKafkaSchema) {
             // this is a Struct with schema downloaded by the schema registry
             // the schema may be different from record to record
@@ -179,7 +179,7 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
             result = Schema.BYTEBUFFER;
         } else if 
(StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
             if (isKey) {
-                // for the key we use the String value and we want 
StringDeserializer
+                // for the key we use the String value, and we want 
StringDeserializer
                 props.put(key, kafkaDeserializerClass);
             }
             result = Schema.STRING;
@@ -206,11 +206,11 @@ public class KafkaBytesSource extends 
KafkaAbstractSource<ByteBuffer> {
         return new ByteBufferSchemaWrapper(result);
     }
 
-    Schema getKeySchema() {
+    Schema<ByteBuffer> getKeySchema() {
         return keySchema;
     }
 
-    Schema getValueSchema() {
+    Schema<ByteBuffer> getValueSchema() {
         return valueSchema;
     }
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index dbbf1a7b5e2..755b2c89c8f 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -43,43 +43,36 @@ public class KafkaSinkConfig implements Serializable {
     private String bootstrapServers;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "Protocol used to communicate with Kafka brokers.")
     private String securityProtocol;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "SASL mechanism used for Kafka client connections.")
     private String saslMechanism;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "JAAS login context parameters for SASL connections in the 
format used by JAAS configuration files.")
     private String saslJaasConfig;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The list of protocols enabled for SSL connections.")
     private String sslEnabledProtocols;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The endpoint identification algorithm to validate server 
hostname using server certificate.")
     private String sslEndpointIdentificationAlgorithm;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The location of the trust store file.")
     private String sslTruststoreLocation;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The password for the trust store file.")
     private String sslTruststorePassword;
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index ad2e121d26a..5de60d2a028 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -44,43 +44,36 @@ public class KafkaSourceConfig implements Serializable {
     private String bootstrapServers;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "Protocol used to communicate with Kafka brokers.")
     private String securityProtocol;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "SASL mechanism used for Kafka client connections.")
     private String saslMechanism;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "JAAS login context parameters for SASL connections in the 
format used by JAAS configuration files.")
     private String saslJaasConfig;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The list of protocols enabled for SSL connections.")
     private String sslEnabledProtocols;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The endpoint identification algorithm to validate server 
hostname using server certificate.")
     private String sslEndpointIdentificationAlgorithm;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The location of the trust store file.")
     private String sslTruststoreLocation;
 
     @FieldDoc(
-            required = false,
             defaultValue = "",
             help = "The password for the trust store file.")
     private String sslTruststorePassword;
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index 58df1838b2e..0c96528cc93 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,14 +27,12 @@ import org.apache.pulsar.client.api.Schema;
  */
 public class KafkaStringSource extends KafkaAbstractSource<String> {
 
-
     @Override
-    public KafkaRecord buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
-        KafkaRecord record = new KafkaRecord(consumerRecord,
+    public KafkaRecord<String> buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
+        return new KafkaRecord<>(consumerRecord,
                 new String((byte[]) consumerRecord.value(), 
StandardCharsets.UTF_8),
                 Schema.STRING,
                 copyKafkaHeaders(consumerRecord));
-        return record;
     }
 
 }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
index e7f108bf601..eff4b294851 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
@@ -32,7 +32,7 @@ import static 
org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 public class ByteBufferSchemaWrapperTest {
 
     @Test
-    public void testGetBytesNoCopy() throws Exception {
+    public void testGetBytesNoCopy() {
         byte[] originalArray = {1, 2, 3};
         ByteBuffer wrapped = ByteBuffer.wrap(originalArray);
         assertEquals(0, wrapped.arrayOffset());
@@ -41,7 +41,7 @@ public class ByteBufferSchemaWrapperTest {
     }
 
     @Test
-    public void testGetBytesOffsetZeroDifferentLen() throws Exception {
+    public void testGetBytesOffsetZeroDifferentLen() {
         byte[] originalArray = {1, 2, 3};
         ByteBuffer wrapped = ByteBuffer.wrap(originalArray, 1, 2);
         assertEquals(0, wrapped.arrayOffset());
@@ -52,7 +52,7 @@ public class ByteBufferSchemaWrapperTest {
     }
 
     @Test
-    public void testGetBytesOffsetNonZero() throws Exception {
+    public void testGetBytesOffsetNonZero() {
         byte[] originalArray = {1, 2, 3};
         ByteBuffer wrapped = ByteBuffer.wrap(originalArray);
         wrapped.position(1);
@@ -66,7 +66,7 @@ public class ByteBufferSchemaWrapperTest {
     }
 
     @Test
-    public void testGetBytesOffsetZero() throws Exception {
+    public void testGetBytesOffsetZero() {
         byte[] originalArray = {1, 2, 3};
         ByteBuffer wrapped = ByteBuffer.wrap(originalArray, 0, 2);
         assertEquals(0, wrapped.arrayOffset());
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
index d370d51cc23..401bd64c36f 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord;
+import org.apache.pulsar.io.kafka.KafkaAbstractSource.KeyValueKafkaRecord;
 import org.bouncycastle.util.encoders.Base64;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
@@ -88,24 +90,25 @@ public class KafkaBytesSourceTest {
 
     }
 
-    private void validateSchemaNoKeyValue(String keyDeserializationClass, 
Schema expectedKeySchema,
-                                          String valueDeserializationClass, 
Schema expectedValueSchema) throws Exception {
-        KafkaBytesSource source = new KafkaBytesSource();
-        Map<String, Object> config = new HashMap<>();
-        config.put("topic","test");
-        config.put("bootstrapServers","localhost:9092");
-        config.put("groupId", "test");
-        config.put("valueDeserializationClass", valueDeserializationClass);
-        config.put("keyDeserializationClass", keyDeserializationClass);
-        config.put("consumerConfigProperties", ImmutableMap.builder()
-                .put("schema.registry.url", "http://localhost:8081";)
-                .build());
-        source.open(config, Mockito.mock(SourceContext.class));
-        assertFalse(source.isProduceKeyValue());
-        Schema keySchema  = source.getKeySchema();
-        Schema valueSchema = source.getValueSchema();
-        assertEquals(keySchema.getSchemaInfo().getType(), 
expectedKeySchema.getSchemaInfo().getType());
-        assertEquals(valueSchema.getSchemaInfo().getType(), 
expectedValueSchema.getSchemaInfo().getType());
+    private void validateSchemaNoKeyValue(String keyDeserializationClass, 
Schema<?> expectedKeySchema,
+                                          String valueDeserializationClass, 
Schema<?> expectedValueSchema) throws Exception {
+        try (KafkaBytesSource source = new KafkaBytesSource()) {
+            Map<String, Object> config = new HashMap<>();
+            config.put("topic", "test");
+            config.put("bootstrapServers", "localhost:9092");
+            config.put("groupId", "test");
+            config.put("valueDeserializationClass", valueDeserializationClass);
+            config.put("keyDeserializationClass", keyDeserializationClass);
+            config.put("consumerConfigProperties", ImmutableMap.builder()
+                    .put("schema.registry.url", "http://localhost:8081";)
+                    .build());
+            source.open(config, Mockito.mock(SourceContext.class));
+            assertFalse(source.isProduceKeyValue());
+            Schema<ByteBuffer> keySchema = source.getKeySchema();
+            Schema<ByteBuffer> valueSchema = source.getValueSchema();
+            assertEquals(keySchema.getSchemaInfo().getType(), 
expectedKeySchema.getSchemaInfo().getType());
+            assertEquals(valueSchema.getSchemaInfo().getType(), 
expectedValueSchema.getSchemaInfo().getType());
+        }
     }
 
     @Test
@@ -120,96 +123,98 @@ public class KafkaBytesSourceTest {
     public void testCopyKafkaHeadersEnabled() throws Exception {
         ByteBuffer key = ByteBuffer.wrap(new 
IntegerSerializer().serialize("test", 10));
         ByteBuffer value = ByteBuffer.wrap(new 
StringSerializer().serialize("test", "test"));
-        KafkaBytesSource source = new KafkaBytesSource();
-        Map<String, Object> config = new HashMap<>();
-        config.put("copyHeadersEnabled", true);
-        config.put("topic","test");
-        config.put("bootstrapServers","localhost:9092");
-        config.put("groupId", "test");
-        config.put("valueDeserializationClass", 
IntegerDeserializer.class.getName());
-        config.put("keyDeserializationClass", 
StringDeserializer.class.getName());
-        config.put("consumerConfigProperties", ImmutableMap.builder()
-                .put("schema.registry.url", "http://localhost:8081";)
-                .build());
-        source.open(config, Mockito.mock(SourceContext.class));
-        ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88, 
99, key, value);
-        record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
-        record.headers().add("k2", new byte[]{0xF});
-
-        Map<String, String> props = source.copyKafkaHeaders(record);
-        assertEquals(props.size(), 5);
-        assertTrue(props.containsKey("__kafka_topic"));
-        assertTrue(props.containsKey("__kafka_partition"));
-        assertTrue(props.containsKey("__kafka_offset"));
-        assertTrue(props.containsKey("k1"));
-        assertTrue(props.containsKey("k2"));
-
-        assertEquals(props.get("__kafka_topic"), "test");
-        assertEquals(props.get("__kafka_partition"), "88");
-        assertEquals(props.get("__kafka_offset"), "99");
-        assertEquals(Base64.decode(props.get("k1")), 
"v1".getBytes(StandardCharsets.UTF_8));
-        assertEquals(Base64.decode(props.get("k2")), new byte[]{0xF});
+        try (KafkaBytesSource source = new KafkaBytesSource()) {
+            Map<String, Object> config = new HashMap<>();
+            config.put("copyHeadersEnabled", true);
+            config.put("topic", "test");
+            config.put("bootstrapServers", "localhost:9092");
+            config.put("groupId", "test");
+            config.put("valueDeserializationClass", 
IntegerDeserializer.class.getName());
+            config.put("keyDeserializationClass", 
StringDeserializer.class.getName());
+            config.put("consumerConfigProperties", ImmutableMap.builder()
+                    .put("schema.registry.url", "http://localhost:8081";)
+                    .build());
+            source.open(config, Mockito.mock(SourceContext.class));
+            ConsumerRecord<Object, Object> record = new 
ConsumerRecord<>("test", 88, 99, key, value);
+            record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
+            record.headers().add("k2", new byte[]{0xF});
+
+            Map<String, String> props = source.copyKafkaHeaders(record);
+            assertEquals(props.size(), 5);
+            assertTrue(props.containsKey("__kafka_topic"));
+            assertTrue(props.containsKey("__kafka_partition"));
+            assertTrue(props.containsKey("__kafka_offset"));
+            assertTrue(props.containsKey("k1"));
+            assertTrue(props.containsKey("k2"));
+
+            assertEquals(props.get("__kafka_topic"), "test");
+            assertEquals(props.get("__kafka_partition"), "88");
+            assertEquals(props.get("__kafka_offset"), "99");
+            assertEquals(Base64.decode(props.get("k1")), 
"v1".getBytes(StandardCharsets.UTF_8));
+            assertEquals(Base64.decode(props.get("k2")), new byte[]{0xF});
+        }
     }
 
     @Test
     public void testCopyKafkaHeadersDisabled() throws Exception {
         ByteBuffer key = ByteBuffer.wrap(new 
IntegerSerializer().serialize("test", 10));
         ByteBuffer value = ByteBuffer.wrap(new 
StringSerializer().serialize("test", "test"));
-        KafkaBytesSource source = new KafkaBytesSource();
-        Map<String, Object> config = new HashMap<>();
-        config.put("topic","test");
-        config.put("bootstrapServers","localhost:9092");
-        config.put("groupId", "test");
-        config.put("valueDeserializationClass", 
IntegerDeserializer.class.getName());
-        config.put("keyDeserializationClass", 
StringDeserializer.class.getName());
-        config.put("consumerConfigProperties", ImmutableMap.builder()
-                .put("schema.registry.url", "http://localhost:8081";)
-                .build());
-        source.open(config, Mockito.mock(SourceContext.class));
-        ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88, 
99, key, value);
-        record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
-        record.headers().add("k2", new byte[]{0xF});
-
-        Map<String, String> props = source.copyKafkaHeaders(record);
-        assertTrue(props.isEmpty());
+        try (KafkaBytesSource source = new KafkaBytesSource()) {
+            Map<String, Object> config = new HashMap<>();
+            config.put("topic", "test");
+            config.put("bootstrapServers", "localhost:9092");
+            config.put("groupId", "test");
+            config.put("valueDeserializationClass", 
IntegerDeserializer.class.getName());
+            config.put("keyDeserializationClass", 
StringDeserializer.class.getName());
+            config.put("consumerConfigProperties", ImmutableMap.builder()
+                    .put("schema.registry.url", "http://localhost:8081";)
+                    .build());
+            source.open(config, Mockito.mock(SourceContext.class));
+            ConsumerRecord<Object, Object> record = new 
ConsumerRecord<>("test", 88, 99, key, value);
+            record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
+            record.headers().add("k2", new byte[]{0xF});
+
+            Map<String, String> props = source.copyKafkaHeaders(record);
+            assertTrue(props.isEmpty());
+        }
     }
 
-    private void validateSchemaKeyValue(String keyDeserializationClass, Schema 
expectedKeySchema,
-                                          String valueDeserializationClass, 
Schema expectedValueSchema,
+    private void validateSchemaKeyValue(String keyDeserializationClass, 
Schema<?> expectedKeySchema,
+                                          String valueDeserializationClass, 
Schema<?> expectedValueSchema,
                                           ByteBuffer key,
                                         ByteBuffer value) throws Exception {
-        KafkaBytesSource source = new KafkaBytesSource();
-        Map<String, Object> config = new HashMap<>();
-        config.put("topic","test");
-        config.put("bootstrapServers","localhost:9092");
-        config.put("groupId", "test");
-        config.put("valueDeserializationClass", valueDeserializationClass);
-        config.put("keyDeserializationClass", keyDeserializationClass);
-        config.put("consumerConfigProperties", ImmutableMap.builder()
-                .put("schema.registry.url", "http://localhost:8081";)
-                .build());
-        source.open(config, Mockito.mock(SourceContext.class));
-        assertTrue(source.isProduceKeyValue());
-        Schema keySchema  = source.getKeySchema();
-        Schema valueSchema = source.getValueSchema();
-        assertEquals(keySchema.getSchemaInfo().getType(), 
expectedKeySchema.getSchemaInfo().getType());
-        assertEquals(valueSchema.getSchemaInfo().getType(), 
expectedValueSchema.getSchemaInfo().getType());
-
-        KafkaAbstractSource.KafkaRecord record = source.buildRecord(new 
ConsumerRecord<Object, Object>("test", 0, 0, key, value));
-        assertThat(record, 
instanceOf(KafkaAbstractSource.KeyValueKafkaRecord.class));
-        KafkaAbstractSource.KeyValueKafkaRecord kvRecord = 
(KafkaAbstractSource.KeyValueKafkaRecord) record;
-        assertSame(keySchema, kvRecord.getKeySchema());
-        assertSame(valueSchema, kvRecord.getValueSchema());
-        assertEquals(KeyValueEncodingType.SEPARATED, 
kvRecord.getKeyValueEncodingType());
-        KeyValue kvValue = (KeyValue) kvRecord.getValue();
-        log.info("key {}", Arrays.toString(toArray(key)));
-        log.info("value {}", Arrays.toString(toArray(value)));
-
-        log.info("key {}", Arrays.toString(toArray((ByteBuffer) 
kvValue.getKey())));
-        log.info("value {}", Arrays.toString(toArray((ByteBuffer) 
kvValue.getValue())));
-
-        assertEquals(ByteBuffer.wrap(toArray(key)).compareTo((ByteBuffer) 
kvValue.getKey()), 0);
-        assertEquals(ByteBuffer.wrap(toArray(value)).compareTo((ByteBuffer) 
kvValue.getValue()), 0);
+        try (KafkaBytesSource source = new KafkaBytesSource()) {
+            Map<String, Object> config = new HashMap<>();
+            config.put("topic", "test");
+            config.put("bootstrapServers", "localhost:9092");
+            config.put("groupId", "test");
+            config.put("valueDeserializationClass", valueDeserializationClass);
+            config.put("keyDeserializationClass", keyDeserializationClass);
+            config.put("consumerConfigProperties", ImmutableMap.builder()
+                    .put("schema.registry.url", "http://localhost:8081";)
+                    .build());
+            source.open(config, Mockito.mock(SourceContext.class));
+            assertTrue(source.isProduceKeyValue());
+            Schema<ByteBuffer> keySchema = source.getKeySchema();
+            Schema<ByteBuffer> valueSchema = source.getValueSchema();
+            assertEquals(keySchema.getSchemaInfo().getType(), 
expectedKeySchema.getSchemaInfo().getType());
+            assertEquals(valueSchema.getSchemaInfo().getType(), 
expectedValueSchema.getSchemaInfo().getType());
+
+            KafkaRecord<ByteBuffer> record = source.buildRecord(new 
ConsumerRecord<>("test", 0, 0, key, value));
+            assertThat(record, instanceOf(KeyValueKafkaRecord.class));
+            KeyValueKafkaRecord<ByteBuffer, ByteBuffer> kvRecord = 
(KeyValueKafkaRecord<ByteBuffer, ByteBuffer>) record;
+            assertSame(keySchema, kvRecord.getKeySchema());
+            assertSame(valueSchema, kvRecord.getValueSchema());
+            assertEquals(KeyValueEncodingType.SEPARATED, 
kvRecord.getKeyValueEncodingType());
+            KeyValue<ByteBuffer, ByteBuffer> kvValue = (KeyValue<ByteBuffer, 
ByteBuffer>) kvRecord.getValue();
+            log.info("key {}", Arrays.toString(toArray(key)));
+            log.info("value {}", Arrays.toString(toArray(value)));
+            log.info("key {}", Arrays.toString(toArray(kvValue.getKey())));
+            log.info("value {}", Arrays.toString(toArray(kvValue.getValue())));
+
+            
assertEquals(ByteBuffer.wrap(toArray(key)).compareTo(kvValue.getKey()), 0);
+            
assertEquals(ByteBuffer.wrap(toArray(value)).compareTo(kvValue.getValue()), 0);
+        }
     }
 
     private static byte[] toArray(ByteBuffer b) {
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index ec9ee4a957d..d59cdb1d9b6 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -47,7 +47,7 @@ public class KafkaAbstractSinkTest {
     private static class DummySink extends KafkaAbstractSink<String, byte[]> {
 
         @Override
-        public KeyValue extractKeyValue(Record record) {
+        public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) 
{
             return new KeyValue<>(record.getKey().orElse(null), 
record.getValue());
         }
     }
@@ -74,7 +74,7 @@ public class KafkaAbstractSinkTest {
 
     @Test
     public void testInvalidConfigWillThrownException() throws Exception {
-        KafkaAbstractSink sink = new DummySink();
+        KafkaAbstractSink<String, byte[]> sink = new DummySink();
         Map<String, Object> config = new HashMap<>();
         SinkContext sc = new SinkContext() {
             @Override
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 6911ec2a6bf..9e0fef87a25 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.io.kafka.source;
 
-
 import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 import java.lang.reflect.Field;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -57,18 +55,17 @@ public class KafkaAbstractSourceTest {
     private static class DummySource extends KafkaAbstractSource<String> {
 
         @Override
-        public KafkaRecord buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
-            KafkaRecord record = new KafkaRecord(consumerRecord,
+        public KafkaRecord<String> buildRecord(ConsumerRecord<Object, Object> 
consumerRecord) {
+            return new KafkaRecord<>(consumerRecord,
                     new String((byte[]) consumerRecord.value(), 
StandardCharsets.UTF_8),
                     Schema.STRING,
                     Collections.emptyMap());
-            return record;
         }
     }
 
     @Test
     public void testInvalidConfigWillThrownException() throws Exception {
-        KafkaAbstractSource source = new DummySource();
+        KafkaAbstractSource<String> source = new DummySource();
         SourceContext ctx = mock(SourceContext.class);
         Map<String, Object> config = new HashMap<>();
         Assert.ThrowingRunnable openAndClose = ()->{
@@ -160,7 +157,7 @@ public class KafkaAbstractSourceTest {
 
     @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "Subscribe exception")
     public final void throwExceptionBySubscribe() throws Exception {
-        KafkaAbstractSource source = new DummySource();
+        KafkaAbstractSource<String> source = new DummySource();
 
         KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
         kafkaSourceConfig.setTopic("test-topic");
@@ -168,9 +165,9 @@ public class KafkaAbstractSourceTest {
         kafkaSourceConfigField.setAccessible(true);
         kafkaSourceConfigField.set(source, kafkaSourceConfig);
 
-        Consumer consumer = mock(Consumer.class);
+        Consumer<Object, Object> consumer = mock(Consumer.class);
         Mockito.doThrow(new RuntimeException("Subscribe 
exception")).when(consumer)
-                .subscribe(Mockito.any(Collection.class));
+                .subscribe(Mockito.anyCollection());
 
         Field consumerField = 
KafkaAbstractSource.class.getDeclaredField("consumer");
         consumerField.setAccessible(true);
@@ -181,7 +178,7 @@ public class KafkaAbstractSourceTest {
 
     @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "Pool exception")
     public final void throwExceptionByPoll() throws Exception {
-        KafkaAbstractSource source = new DummySource();
+        KafkaAbstractSource<String> source = new DummySource();
 
         KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
         kafkaSourceConfig.setTopic("test-topic");
@@ -189,7 +186,7 @@ public class KafkaAbstractSourceTest {
         kafkaSourceConfigField.setAccessible(true);
         kafkaSourceConfigField.set(source, kafkaSourceConfig);
 
-        Consumer consumer = mock(Consumer.class);
+        Consumer<Object, Object> consumer = mock(Consumer.class);
         Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
                 .poll(Mockito.any(Duration.class));
 


Reply via email to