This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 54f0ca1c769 [cleanup][io] Cleanup Kafka connector (#20721)
54f0ca1c769 is described below
commit 54f0ca1c769638000fbeb4d73a565216c0992c32
Author: Christophe Bornet <[email protected]>
AuthorDate: Sat Jul 22 21:40:47 2023 +0200
[cleanup][io] Cleanup Kafka connector (#20721)
---
.../org/apache/pulsar/functions/api/Record.java | 2 +-
.../apache/pulsar/io/kafka/AvroSchemaCache.java | 4 +-
.../pulsar/io/kafka/ByteBufferSchemaWrapper.java | 2 +-
.../pulsar/io/kafka/BytesWithKafkaSchema.java | 4 +-
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 2 +-
.../pulsar/io/kafka/KafkaAbstractSource.java | 25 +--
.../org/apache/pulsar/io/kafka/KafkaBytesSink.java | 2 +-
.../apache/pulsar/io/kafka/KafkaBytesSource.java | 36 ++--
.../apache/pulsar/io/kafka/KafkaSinkConfig.java | 7 -
.../apache/pulsar/io/kafka/KafkaSourceConfig.java | 7 -
.../apache/pulsar/io/kafka/KafkaStringSource.java | 6 +-
.../io/kafka/ByteBufferSchemaWrapperTest.java | 8 +-
.../pulsar/io/kafka/KafkaBytesSourceTest.java | 201 +++++++++++----------
.../io/kafka/sink/KafkaAbstractSinkTest.java | 4 +-
.../io/kafka/source/KafkaAbstractSourceTest.java | 19 +-
15 files changed, 158 insertions(+), 171 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index ea6987e5f03..0487b3d02b3 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -27,7 +27,7 @@ import
org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
/**
- * Pulsar Connect's Record interface. Record encapsulates the information
about a record being read from a Source.
+ * Pulsar IO's Record interface. Record encapsulates the information about a
record being read from a Source.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index f0ad79549eb..4e3abe245be 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -38,9 +38,9 @@ final class AvroSchemaCache {
private final LoadingCache<Integer, Schema<ByteBuffer>> cache =
CacheBuilder
.newBuilder()
.maximumSize(100)
- .build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
+ .build(new CacheLoader<>() {
@Override
- public Schema<ByteBuffer> load(Integer schemaId) throws
Exception {
+ public Schema<ByteBuffer> load(Integer schemaId) {
return fetchSchema(schemaId);
}
});
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
index fb4683e511d..aefe62e3857 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapper.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
class ByteBufferSchemaWrapper implements Schema<ByteBuffer> {
private final Supplier<SchemaInfo> original;
- public ByteBufferSchemaWrapper(Schema original) {
+ public ByteBufferSchemaWrapper(Schema<?> original) {
this(original::getSchemaInfo);
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
index f0429c92eed..585cd927c30 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
@@ -26,6 +26,6 @@ import lombok.Value;
*/
@Value
public class BytesWithKafkaSchema {
- private final ByteBuffer value;
- private final int schemaId;
+ ByteBuffer value;
+ int schemaId;
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 8fbd6c81861..5ceea4dec8d 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.io.core.SinkContext;
public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
private Producer<K, V> producer;
- private Properties props = new Properties();
+ private final Properties props = new Properties();
private KafkaSinkConfig kafkaSinkConfig;
@Override
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 3d4612c039f..f8539518851 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -166,7 +166,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
int index = 0;
for (ConsumerRecord<Object, Object> consumerRecord :
consumerRecords) {
- KafkaRecord record = buildRecord(consumerRecord);
+ KafkaRecord<V> record = buildRecord(consumerRecord);
if (LOG.isDebugEnabled()) {
LOG.debug("Write record {} {} {}",
record.getKey(), record.getValue(), record.getSchema());
}
@@ -190,7 +190,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
runnerThread.start();
}
- public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object>
consumerRecord);
+ public abstract KafkaRecord<V> buildRecord(ConsumerRecord<Object, Object>
consumerRecord);
protected Map<String, String> copyKafkaHeaders(ConsumerRecord<Object,
Object> consumerRecord) {
if (!kafkaSourceConfig.isCopyHeadersEnabled()) {
@@ -208,7 +208,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
@Slf4j
protected static class KafkaRecord<V> implements Record<V> {
- private final ConsumerRecord<String, ?> record;
+ private final ConsumerRecord<?, ?> record;
private final V value;
private final Schema<V> schema;
private final Map<String, String> properties;
@@ -216,7 +216,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
@Getter
private final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- public KafkaRecord(ConsumerRecord<String, ?> record, V value,
Schema<V> schema,
+ public KafkaRecord(ConsumerRecord<?, ?> record, V value, Schema<V>
schema,
Map<String, String> properties) {
this.record = record;
this.value = value;
@@ -240,7 +240,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
@Override
public Optional<String> getKey() {
- return Optional.ofNullable(record.key());
+ return Optional.ofNullable(record.key() instanceof String ?
(String) record.key() : null);
}
@Override
@@ -263,13 +263,14 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
return properties;
}
}
- protected static class KeyValueKafkaRecord<V> extends KafkaRecord
implements KVRecord<Object, Object> {
- private final Schema<Object> keySchema;
- private final Schema<Object> valueSchema;
+ protected static class KeyValueKafkaRecord<K, W> extends KafkaRecord
implements KVRecord<K, W> {
- public KeyValueKafkaRecord(ConsumerRecord record, KeyValue value,
- Schema<Object> keySchema, Schema<Object>
valueSchema,
+ private final Schema<K> keySchema;
+ private final Schema<W> valueSchema;
+
+ public KeyValueKafkaRecord(ConsumerRecord<?, ?> record, KeyValue<K, W>
value,
+ Schema<K> keySchema, Schema<W> valueSchema,
Map<String, String> properties) {
super(record, value, null, properties);
this.keySchema = keySchema;
@@ -277,12 +278,12 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
}
@Override
- public Schema<Object> getKeySchema() {
+ public Schema<K> getKeySchema() {
return keySchema;
}
@Override
- public Schema<Object> getValueSchema() {
+ public Schema<W> getValueSchema() {
return valueSchema;
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 2a200531b67..0ed23856995 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
/**
- * Kafka sink should treats incoming messages as pure bytes. So we don't
+ * Kafka sink should treat incoming messages as pure bytes. So we don't
* apply schema into it.
*/
@Connector(
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 4e35d98e0bb..51408f04519 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -70,8 +70,8 @@ import org.apache.pulsar.io.core.annotations.IOType;
public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
private AvroSchemaCache schemaCache;
- private Schema keySchema;
- private Schema valueSchema;
+ private Schema<ByteBuffer> keySchema;
+ private Schema<ByteBuffer> valueSchema;
private boolean produceKeyValue;
@Override
@@ -93,11 +93,11 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
}
if (keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
- // if the Key is a String we can use native Pulsar Key
- // otherwise we use KeyValue schema
- // that allows you to set a schema for the Key and a schema for
the Value.
- // using SEPARATED encoding the key is saved into the binary key
- // so it is used for routing and for compaction
+ // If the Key is a String we can use native Pulsar Key.
+ // Otherwise, we use KeyValue schema.
+ // That allows you to set a schema for the Key and a schema for
the Value.
+ // Using SEPARATED encoding the key is saved into the binary key,
+ // so it is used for routing and for compaction.
produceKeyValue = true;
}
@@ -114,13 +114,13 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
}
@Override
- public KafkaRecord buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
+ public KafkaRecord<ByteBuffer> buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
if (produceKeyValue) {
- Object key = extractSimpleValue(consumerRecord.key());
- Object value = extractSimpleValue(consumerRecord.value());
- Schema currentKeySchema =
getSchemaFromObject(consumerRecord.key(), keySchema);
- Schema currentValueSchema =
getSchemaFromObject(consumerRecord.value(), valueSchema);
- return new KeyValueKafkaRecord(consumerRecord,
+ ByteBuffer key = extractSimpleValue(consumerRecord.key());
+ ByteBuffer value = extractSimpleValue(consumerRecord.value());
+ Schema<ByteBuffer> currentKeySchema =
getSchemaFromObject(consumerRecord.key(), keySchema);
+ Schema<ByteBuffer> currentValueSchema =
getSchemaFromObject(consumerRecord.value(), valueSchema);
+ return new KeyValueKafkaRecord<ByteBuffer,
ByteBuffer>(consumerRecord,
new KeyValue<>(key, value),
currentKeySchema,
currentValueSchema,
@@ -128,7 +128,7 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
} else {
Object value = consumerRecord.value();
- return new KafkaRecord(consumerRecord,
+ return new KafkaRecord<>(consumerRecord,
extractSimpleValue(value),
getSchemaFromObject(value, valueSchema),
copyKafkaHeaders(consumerRecord));
@@ -152,7 +152,7 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
}
}
- private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema
fallback) {
+ private Schema<ByteBuffer> getSchemaFromObject(Object value,
Schema<ByteBuffer> fallback) {
if (value instanceof BytesWithKafkaSchema) {
// this is a Struct with schema downloaded by the schema registry
// the schema may be different from record to record
@@ -179,7 +179,7 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
result = Schema.BYTEBUFFER;
} else if
(StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
if (isKey) {
- // for the key we use the String value and we want
StringDeserializer
+ // for the key we use the String value, and we want
StringDeserializer
props.put(key, kafkaDeserializerClass);
}
result = Schema.STRING;
@@ -206,11 +206,11 @@ public class KafkaBytesSource extends
KafkaAbstractSource<ByteBuffer> {
return new ByteBufferSchemaWrapper(result);
}
- Schema getKeySchema() {
+ Schema<ByteBuffer> getKeySchema() {
return keySchema;
}
- Schema getValueSchema() {
+ Schema<ByteBuffer> getValueSchema() {
return valueSchema;
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index dbbf1a7b5e2..755b2c89c8f 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -43,43 +43,36 @@ public class KafkaSinkConfig implements Serializable {
private String bootstrapServers;
@FieldDoc(
- required = false,
defaultValue = "",
help = "Protocol used to communicate with Kafka brokers.")
private String securityProtocol;
@FieldDoc(
- required = false,
defaultValue = "",
help = "SASL mechanism used for Kafka client connections.")
private String saslMechanism;
@FieldDoc(
- required = false,
defaultValue = "",
help = "JAAS login context parameters for SASL connections in the
format used by JAAS configuration files.")
private String saslJaasConfig;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The list of protocols enabled for SSL connections.")
private String sslEnabledProtocols;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The endpoint identification algorithm to validate server
hostname using server certificate.")
private String sslEndpointIdentificationAlgorithm;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The location of the trust store file.")
private String sslTruststoreLocation;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The password for the trust store file.")
private String sslTruststorePassword;
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index ad2e121d26a..5de60d2a028 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -44,43 +44,36 @@ public class KafkaSourceConfig implements Serializable {
private String bootstrapServers;
@FieldDoc(
- required = false,
defaultValue = "",
help = "Protocol used to communicate with Kafka brokers.")
private String securityProtocol;
@FieldDoc(
- required = false,
defaultValue = "",
help = "SASL mechanism used for Kafka client connections.")
private String saslMechanism;
@FieldDoc(
- required = false,
defaultValue = "",
help = "JAAS login context parameters for SASL connections in the
format used by JAAS configuration files.")
private String saslJaasConfig;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The list of protocols enabled for SSL connections.")
private String sslEnabledProtocols;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The endpoint identification algorithm to validate server
hostname using server certificate.")
private String sslEndpointIdentificationAlgorithm;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The location of the trust store file.")
private String sslTruststoreLocation;
@FieldDoc(
- required = false,
defaultValue = "",
help = "The password for the trust store file.")
private String sslTruststorePassword;
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index 58df1838b2e..0c96528cc93 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,14 +27,12 @@ import org.apache.pulsar.client.api.Schema;
*/
public class KafkaStringSource extends KafkaAbstractSource<String> {
-
@Override
- public KafkaRecord buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
- KafkaRecord record = new KafkaRecord(consumerRecord,
+ public KafkaRecord<String> buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
+ return new KafkaRecord<>(consumerRecord,
new String((byte[]) consumerRecord.value(),
StandardCharsets.UTF_8),
Schema.STRING,
copyKafkaHeaders(consumerRecord));
- return record;
}
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
index e7f108bf601..eff4b294851 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/ByteBufferSchemaWrapperTest.java
@@ -32,7 +32,7 @@ import static
org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
public class ByteBufferSchemaWrapperTest {
@Test
- public void testGetBytesNoCopy() throws Exception {
+ public void testGetBytesNoCopy() {
byte[] originalArray = {1, 2, 3};
ByteBuffer wrapped = ByteBuffer.wrap(originalArray);
assertEquals(0, wrapped.arrayOffset());
@@ -41,7 +41,7 @@ public class ByteBufferSchemaWrapperTest {
}
@Test
- public void testGetBytesOffsetZeroDifferentLen() throws Exception {
+ public void testGetBytesOffsetZeroDifferentLen() {
byte[] originalArray = {1, 2, 3};
ByteBuffer wrapped = ByteBuffer.wrap(originalArray, 1, 2);
assertEquals(0, wrapped.arrayOffset());
@@ -52,7 +52,7 @@ public class ByteBufferSchemaWrapperTest {
}
@Test
- public void testGetBytesOffsetNonZero() throws Exception {
+ public void testGetBytesOffsetNonZero() {
byte[] originalArray = {1, 2, 3};
ByteBuffer wrapped = ByteBuffer.wrap(originalArray);
wrapped.position(1);
@@ -66,7 +66,7 @@ public class ByteBufferSchemaWrapperTest {
}
@Test
- public void testGetBytesOffsetZero() throws Exception {
+ public void testGetBytesOffsetZero() {
byte[] originalArray = {1, 2, 3};
ByteBuffer wrapped = ByteBuffer.wrap(originalArray, 0, 2);
assertEquals(0, wrapped.arrayOffset());
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
index d370d51cc23..401bd64c36f 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.kafka.KafkaAbstractSource.KafkaRecord;
+import org.apache.pulsar.io.kafka.KafkaAbstractSource.KeyValueKafkaRecord;
import org.bouncycastle.util.encoders.Base64;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -88,24 +90,25 @@ public class KafkaBytesSourceTest {
}
- private void validateSchemaNoKeyValue(String keyDeserializationClass,
Schema expectedKeySchema,
- String valueDeserializationClass,
Schema expectedValueSchema) throws Exception {
- KafkaBytesSource source = new KafkaBytesSource();
- Map<String, Object> config = new HashMap<>();
- config.put("topic","test");
- config.put("bootstrapServers","localhost:9092");
- config.put("groupId", "test");
- config.put("valueDeserializationClass", valueDeserializationClass);
- config.put("keyDeserializationClass", keyDeserializationClass);
- config.put("consumerConfigProperties", ImmutableMap.builder()
- .put("schema.registry.url", "http://localhost:8081")
- .build());
- source.open(config, Mockito.mock(SourceContext.class));
- assertFalse(source.isProduceKeyValue());
- Schema keySchema = source.getKeySchema();
- Schema valueSchema = source.getValueSchema();
- assertEquals(keySchema.getSchemaInfo().getType(),
expectedKeySchema.getSchemaInfo().getType());
- assertEquals(valueSchema.getSchemaInfo().getType(),
expectedValueSchema.getSchemaInfo().getType());
+ private void validateSchemaNoKeyValue(String keyDeserializationClass,
Schema<?> expectedKeySchema,
+ String valueDeserializationClass,
Schema<?> expectedValueSchema) throws Exception {
+ try (KafkaBytesSource source = new KafkaBytesSource()) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("topic", "test");
+ config.put("bootstrapServers", "localhost:9092");
+ config.put("groupId", "test");
+ config.put("valueDeserializationClass", valueDeserializationClass);
+ config.put("keyDeserializationClass", keyDeserializationClass);
+ config.put("consumerConfigProperties", ImmutableMap.builder()
+ .put("schema.registry.url", "http://localhost:8081")
+ .build());
+ source.open(config, Mockito.mock(SourceContext.class));
+ assertFalse(source.isProduceKeyValue());
+ Schema<ByteBuffer> keySchema = source.getKeySchema();
+ Schema<ByteBuffer> valueSchema = source.getValueSchema();
+ assertEquals(keySchema.getSchemaInfo().getType(),
expectedKeySchema.getSchemaInfo().getType());
+ assertEquals(valueSchema.getSchemaInfo().getType(),
expectedValueSchema.getSchemaInfo().getType());
+ }
}
@Test
@@ -120,96 +123,98 @@ public class KafkaBytesSourceTest {
public void testCopyKafkaHeadersEnabled() throws Exception {
ByteBuffer key = ByteBuffer.wrap(new
IntegerSerializer().serialize("test", 10));
ByteBuffer value = ByteBuffer.wrap(new
StringSerializer().serialize("test", "test"));
- KafkaBytesSource source = new KafkaBytesSource();
- Map<String, Object> config = new HashMap<>();
- config.put("copyHeadersEnabled", true);
- config.put("topic","test");
- config.put("bootstrapServers","localhost:9092");
- config.put("groupId", "test");
- config.put("valueDeserializationClass",
IntegerDeserializer.class.getName());
- config.put("keyDeserializationClass",
StringDeserializer.class.getName());
- config.put("consumerConfigProperties", ImmutableMap.builder()
- .put("schema.registry.url", "http://localhost:8081")
- .build());
- source.open(config, Mockito.mock(SourceContext.class));
- ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88,
99, key, value);
- record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
- record.headers().add("k2", new byte[]{0xF});
-
- Map<String, String> props = source.copyKafkaHeaders(record);
- assertEquals(props.size(), 5);
- assertTrue(props.containsKey("__kafka_topic"));
- assertTrue(props.containsKey("__kafka_partition"));
- assertTrue(props.containsKey("__kafka_offset"));
- assertTrue(props.containsKey("k1"));
- assertTrue(props.containsKey("k2"));
-
- assertEquals(props.get("__kafka_topic"), "test");
- assertEquals(props.get("__kafka_partition"), "88");
- assertEquals(props.get("__kafka_offset"), "99");
- assertEquals(Base64.decode(props.get("k1")),
"v1".getBytes(StandardCharsets.UTF_8));
- assertEquals(Base64.decode(props.get("k2")), new byte[]{0xF});
+ try (KafkaBytesSource source = new KafkaBytesSource()) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("copyHeadersEnabled", true);
+ config.put("topic", "test");
+ config.put("bootstrapServers", "localhost:9092");
+ config.put("groupId", "test");
+ config.put("valueDeserializationClass",
IntegerDeserializer.class.getName());
+ config.put("keyDeserializationClass",
StringDeserializer.class.getName());
+ config.put("consumerConfigProperties", ImmutableMap.builder()
+ .put("schema.registry.url", "http://localhost:8081")
+ .build());
+ source.open(config, Mockito.mock(SourceContext.class));
+ ConsumerRecord<Object, Object> record = new
ConsumerRecord<>("test", 88, 99, key, value);
+ record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
+ record.headers().add("k2", new byte[]{0xF});
+
+ Map<String, String> props = source.copyKafkaHeaders(record);
+ assertEquals(props.size(), 5);
+ assertTrue(props.containsKey("__kafka_topic"));
+ assertTrue(props.containsKey("__kafka_partition"));
+ assertTrue(props.containsKey("__kafka_offset"));
+ assertTrue(props.containsKey("k1"));
+ assertTrue(props.containsKey("k2"));
+
+ assertEquals(props.get("__kafka_topic"), "test");
+ assertEquals(props.get("__kafka_partition"), "88");
+ assertEquals(props.get("__kafka_offset"), "99");
+ assertEquals(Base64.decode(props.get("k1")),
"v1".getBytes(StandardCharsets.UTF_8));
+ assertEquals(Base64.decode(props.get("k2")), new byte[]{0xF});
+ }
}
@Test
public void testCopyKafkaHeadersDisabled() throws Exception {
ByteBuffer key = ByteBuffer.wrap(new
IntegerSerializer().serialize("test", 10));
ByteBuffer value = ByteBuffer.wrap(new
StringSerializer().serialize("test", "test"));
- KafkaBytesSource source = new KafkaBytesSource();
- Map<String, Object> config = new HashMap<>();
- config.put("topic","test");
- config.put("bootstrapServers","localhost:9092");
- config.put("groupId", "test");
- config.put("valueDeserializationClass",
IntegerDeserializer.class.getName());
- config.put("keyDeserializationClass",
StringDeserializer.class.getName());
- config.put("consumerConfigProperties", ImmutableMap.builder()
- .put("schema.registry.url", "http://localhost:8081")
- .build());
- source.open(config, Mockito.mock(SourceContext.class));
- ConsumerRecord record = new ConsumerRecord<Object, Object>("test", 88,
99, key, value);
- record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
- record.headers().add("k2", new byte[]{0xF});
-
- Map<String, String> props = source.copyKafkaHeaders(record);
- assertTrue(props.isEmpty());
+ try (KafkaBytesSource source = new KafkaBytesSource()) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("topic", "test");
+ config.put("bootstrapServers", "localhost:9092");
+ config.put("groupId", "test");
+ config.put("valueDeserializationClass",
IntegerDeserializer.class.getName());
+ config.put("keyDeserializationClass",
StringDeserializer.class.getName());
+ config.put("consumerConfigProperties", ImmutableMap.builder()
+ .put("schema.registry.url", "http://localhost:8081")
+ .build());
+ source.open(config, Mockito.mock(SourceContext.class));
+ ConsumerRecord<Object, Object> record = new
ConsumerRecord<>("test", 88, 99, key, value);
+ record.headers().add("k1", "v1".getBytes(StandardCharsets.UTF_8));
+ record.headers().add("k2", new byte[]{0xF});
+
+ Map<String, String> props = source.copyKafkaHeaders(record);
+ assertTrue(props.isEmpty());
+ }
}
- private void validateSchemaKeyValue(String keyDeserializationClass, Schema
expectedKeySchema,
- String valueDeserializationClass,
Schema expectedValueSchema,
+ private void validateSchemaKeyValue(String keyDeserializationClass,
Schema<?> expectedKeySchema,
+ String valueDeserializationClass,
Schema<?> expectedValueSchema,
ByteBuffer key,
ByteBuffer value) throws Exception {
- KafkaBytesSource source = new KafkaBytesSource();
- Map<String, Object> config = new HashMap<>();
- config.put("topic","test");
- config.put("bootstrapServers","localhost:9092");
- config.put("groupId", "test");
- config.put("valueDeserializationClass", valueDeserializationClass);
- config.put("keyDeserializationClass", keyDeserializationClass);
- config.put("consumerConfigProperties", ImmutableMap.builder()
- .put("schema.registry.url", "http://localhost:8081")
- .build());
- source.open(config, Mockito.mock(SourceContext.class));
- assertTrue(source.isProduceKeyValue());
- Schema keySchema = source.getKeySchema();
- Schema valueSchema = source.getValueSchema();
- assertEquals(keySchema.getSchemaInfo().getType(),
expectedKeySchema.getSchemaInfo().getType());
- assertEquals(valueSchema.getSchemaInfo().getType(),
expectedValueSchema.getSchemaInfo().getType());
-
- KafkaAbstractSource.KafkaRecord record = source.buildRecord(new
ConsumerRecord<Object, Object>("test", 0, 0, key, value));
- assertThat(record,
instanceOf(KafkaAbstractSource.KeyValueKafkaRecord.class));
- KafkaAbstractSource.KeyValueKafkaRecord kvRecord =
(KafkaAbstractSource.KeyValueKafkaRecord) record;
- assertSame(keySchema, kvRecord.getKeySchema());
- assertSame(valueSchema, kvRecord.getValueSchema());
- assertEquals(KeyValueEncodingType.SEPARATED,
kvRecord.getKeyValueEncodingType());
- KeyValue kvValue = (KeyValue) kvRecord.getValue();
- log.info("key {}", Arrays.toString(toArray(key)));
- log.info("value {}", Arrays.toString(toArray(value)));
-
- log.info("key {}", Arrays.toString(toArray((ByteBuffer)
kvValue.getKey())));
- log.info("value {}", Arrays.toString(toArray((ByteBuffer)
kvValue.getValue())));
-
- assertEquals(ByteBuffer.wrap(toArray(key)).compareTo((ByteBuffer)
kvValue.getKey()), 0);
- assertEquals(ByteBuffer.wrap(toArray(value)).compareTo((ByteBuffer)
kvValue.getValue()), 0);
+ try (KafkaBytesSource source = new KafkaBytesSource()) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("topic", "test");
+ config.put("bootstrapServers", "localhost:9092");
+ config.put("groupId", "test");
+ config.put("valueDeserializationClass", valueDeserializationClass);
+ config.put("keyDeserializationClass", keyDeserializationClass);
+ config.put("consumerConfigProperties", ImmutableMap.builder()
+ .put("schema.registry.url", "http://localhost:8081")
+ .build());
+ source.open(config, Mockito.mock(SourceContext.class));
+ assertTrue(source.isProduceKeyValue());
+ Schema<ByteBuffer> keySchema = source.getKeySchema();
+ Schema<ByteBuffer> valueSchema = source.getValueSchema();
+ assertEquals(keySchema.getSchemaInfo().getType(),
expectedKeySchema.getSchemaInfo().getType());
+ assertEquals(valueSchema.getSchemaInfo().getType(),
expectedValueSchema.getSchemaInfo().getType());
+
+ KafkaRecord<ByteBuffer> record = source.buildRecord(new
ConsumerRecord<>("test", 0, 0, key, value));
+ assertThat(record, instanceOf(KeyValueKafkaRecord.class));
+ KeyValueKafkaRecord<ByteBuffer, ByteBuffer> kvRecord =
(KeyValueKafkaRecord<ByteBuffer, ByteBuffer>) record;
+ assertSame(keySchema, kvRecord.getKeySchema());
+ assertSame(valueSchema, kvRecord.getValueSchema());
+ assertEquals(KeyValueEncodingType.SEPARATED,
kvRecord.getKeyValueEncodingType());
+ KeyValue<ByteBuffer, ByteBuffer> kvValue = (KeyValue<ByteBuffer,
ByteBuffer>) kvRecord.getValue();
+ log.info("key {}", Arrays.toString(toArray(key)));
+ log.info("value {}", Arrays.toString(toArray(value)));
+ log.info("key {}", Arrays.toString(toArray(kvValue.getKey())));
+ log.info("value {}", Arrays.toString(toArray(kvValue.getValue())));
+
+
assertEquals(ByteBuffer.wrap(toArray(key)).compareTo(kvValue.getKey()), 0);
+
assertEquals(ByteBuffer.wrap(toArray(value)).compareTo(kvValue.getValue()), 0);
+ }
}
private static byte[] toArray(ByteBuffer b) {
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index ec9ee4a957d..d59cdb1d9b6 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -47,7 +47,7 @@ public class KafkaAbstractSinkTest {
private static class DummySink extends KafkaAbstractSink<String, byte[]> {
@Override
- public KeyValue extractKeyValue(Record record) {
+ public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record)
{
return new KeyValue<>(record.getKey().orElse(null),
record.getValue());
}
}
@@ -74,7 +74,7 @@ public class KafkaAbstractSinkTest {
@Test
public void testInvalidConfigWillThrownException() throws Exception {
- KafkaAbstractSink sink = new DummySink();
+ KafkaAbstractSink<String, byte[]> sink = new DummySink();
Map<String, Object> config = new HashMap<>();
SinkContext sc = new SinkContext() {
@Override
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 6911ec2a6bf..9e0fef87a25 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -18,10 +18,8 @@
*/
package org.apache.pulsar.io.kafka.source;
-
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
-import java.util.Collection;
import java.util.Collections;
import java.lang.reflect.Field;
import org.apache.kafka.clients.consumer.Consumer;
@@ -57,18 +55,17 @@ public class KafkaAbstractSourceTest {
private static class DummySource extends KafkaAbstractSource<String> {
@Override
- public KafkaRecord buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
- KafkaRecord record = new KafkaRecord(consumerRecord,
+ public KafkaRecord<String> buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
+ return new KafkaRecord<>(consumerRecord,
new String((byte[]) consumerRecord.value(),
StandardCharsets.UTF_8),
Schema.STRING,
Collections.emptyMap());
- return record;
}
}
@Test
public void testInvalidConfigWillThrownException() throws Exception {
- KafkaAbstractSource source = new DummySource();
+ KafkaAbstractSource<String> source = new DummySource();
SourceContext ctx = mock(SourceContext.class);
Map<String, Object> config = new HashMap<>();
Assert.ThrowingRunnable openAndClose = ()->{
@@ -160,7 +157,7 @@ public class KafkaAbstractSourceTest {
@Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Subscribe exception")
public final void throwExceptionBySubscribe() throws Exception {
- KafkaAbstractSource source = new DummySource();
+ KafkaAbstractSource<String> source = new DummySource();
KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
kafkaSourceConfig.setTopic("test-topic");
@@ -168,9 +165,9 @@ public class KafkaAbstractSourceTest {
kafkaSourceConfigField.setAccessible(true);
kafkaSourceConfigField.set(source, kafkaSourceConfig);
- Consumer consumer = mock(Consumer.class);
+ Consumer<Object, Object> consumer = mock(Consumer.class);
Mockito.doThrow(new RuntimeException("Subscribe
exception")).when(consumer)
- .subscribe(Mockito.any(Collection.class));
+ .subscribe(Mockito.anyCollection());
Field consumerField =
KafkaAbstractSource.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
@@ -181,7 +178,7 @@ public class KafkaAbstractSourceTest {
@Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Pool exception")
public final void throwExceptionByPoll() throws Exception {
- KafkaAbstractSource source = new DummySource();
+ KafkaAbstractSource<String> source = new DummySource();
KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
kafkaSourceConfig.setTopic("test-topic");
@@ -189,7 +186,7 @@ public class KafkaAbstractSourceTest {
kafkaSourceConfigField.setAccessible(true);
kafkaSourceConfigField.set(source, kafkaSourceConfig);
- Consumer consumer = mock(Consumer.class);
+ Consumer<Object, Object> consumer = mock(Consumer.class);
Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
.poll(Mockito.any(Duration.class));