This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d52a1b0 Pulsar IO - KafkaSource - allow to manage Avro Encoded
messages (#9448)
d52a1b0 is described below
commit d52a1b0f02e3eee94dc1465fd3d0fa101897ab99
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Mar 15 08:33:41 2021 +0100
Pulsar IO - KafkaSource - allow to manage Avro Encoded messages (#9448)
### Motivation
Currently KafkaSource allows only to deal with strings and byte arrays, it
does not support records with Schema.
In Kafka we have the ability to encode messages using Avro and there is a
Schema Registry (by Confluent®)
### Modifications
Summary of changes:
- allow current KafkaSource (`KafkaBytesSource`) to deal with
`io.confluent.kafka.serializers.KafkaAvroDeserializer ` and copy the raw bytes
to the Pulsar topic, setting appropriately the Schema
- this source support Schema Evolution end-to-end (i.e. add fields to the
original schema in the Kafka world, and see the new fields in the Pulsar topic,
without any reconfiguration or restart)
- add Confluent® Schema Registry Client to the Kafka Connector NAR, the
license is compatible with Apache 2 license and we can redistribute it
- the configuration of the Schema Registry Client is done done in the
consumerProperties property of the source (usually you add schema.registry.url)
- add integration tests with Kafka and Schema Registry
### Verifying this change
The patch introduces new integration tests.
The integration tests launch a Kafka Container and also a Confluent Schema
Registry Container
---
pom.xml | 2 +
pulsar-io/kafka/pom.xml | 18 +
.../apache/pulsar/io/kafka/AvroSchemaCache.java | 109 +++++
...StringSource.java => BytesWithKafkaSchema.java} | 17 +-
.../pulsar/io/kafka/KafkaAbstractSource.java | 44 +-
.../apache/pulsar/io/kafka/KafkaBytesSource.java | 90 +++-
.../apache/pulsar/io/kafka/KafkaStringSource.java | 12 +-
.../io/kafka/source/KafkaAbstractSourceTest.java | 9 +-
.../tests/integration/io/AvroKafkaSourceTest.java | 496 +++++++++++++++++++++
.../src/test/resources/pulsar-messaging.xml | 1 +
10 files changed, 761 insertions(+), 37 deletions(-)
diff --git a/pom.xml b/pom.xml
index 436ad6b..2ad4e74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,8 @@ flexible messaging model and an intuitive client
API.</description>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
+
<kafka.confluent.schemaregistryclient.version>5.3.0</kafka.confluent.schemaregistryclient.version>
+
<kafka.confluent.avroserializer.version>5.3.0</kafka.confluent.avroserializer.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
<aircompressor.version>0.16</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index d76ce65..63f3387 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -36,6 +36,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -49,11 +50,28 @@
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-client.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry</artifactId>
+ <version>${kafka.confluent.schemaregistryclient.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>${kafka.confluent.avroserializer.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
new file mode 100644
index 0000000..cc158e3
--- /dev/null
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+final class AvroSchemaCache {
+ private final LoadingCache<Integer, Schema<ByteBuffer>> cache =
CacheBuilder
+ .newBuilder()
+ .maximumSize(100)
+ .build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
+ @Override
+ public Schema<ByteBuffer> load(Integer schemaId) throws
Exception {
+ return fetchSchema(schemaId);
+ }
+ });
+
+ private final SchemaRegistryClient schemaRegistryClient;
+
+ public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
+ this.schemaRegistryClient = schemaRegistryClient;
+ }
+
+ public Schema<ByteBuffer> get(int schemaId) {
+ try {
+ return cache.get(schemaId);
+ } catch (ExecutionException err) {
+ throw new RuntimeException(err.getCause());
+ }
+ }
+
+ private Schema<ByteBuffer> fetchSchema(int schemaId) {
+ try {
+ org.apache.avro.Schema schema =
schemaRegistryClient.getById(schemaId);
+ String definition = schema.toString(false);
+ log.info("Schema {} definition {}", schemaId, definition);
+ SchemaInfo schemaInfo = SchemaInfo.builder()
+ .type(SchemaType.AVRO)
+ .name(schema.getName())
+ .properties(Collections.emptyMap())
+ .schema(definition.getBytes(StandardCharsets.UTF_8)
+ ).build();
+ return new Schema<ByteBuffer>() {
+ @Override
+ public byte[] encode(ByteBuffer message) {
+ return getBytes(message);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return schemaInfo;
+ }
+
+ @Override
+ public Schema<ByteBuffer> clone() {
+ return this;
+ }
+
+ @Override
+ public ByteBuffer decode(byte[] bytes, byte[] schemaVersion) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (IOException | RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ private static byte[] getBytes(ByteBuffer buffer) {
+ buffer.mark();
+ byte[] avroEncodedData = new byte[buffer.remaining()];
+ buffer.get(avroEncodedData);
+ buffer.reset();
+ return avroEncodedData;
+ }
+
+}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
similarity index 67%
copy from
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
copy to
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
index 04c7793..fbc6280 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/BytesWithKafkaSchema.java
@@ -16,20 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.io.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import lombok.Value;
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
/**
- * Simple Kafka Source that just transfers the value part of the kafka records
- * as Strings
+ * This is a wrapper around a Byte array (the Avro encoded record) and a
schema id in the Kafka Schema Registry.
*/
-public class KafkaStringSource extends KafkaAbstractSource<String> {
- @Override
- public String extractValue(ConsumerRecord<String, byte[]> record) {
- return new String(record.value(), StandardCharsets.UTF_8);
- }
+@Value
+public class BytesWithKafkaSchema {
+ private final ByteBuffer value;
+ private final int schemaId;
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index de2e0c2..4258333 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -19,20 +19,21 @@
package org.apache.pulsar.io.kafka;
-import java.util.Collections;
-import java.util.Objects;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import java.util.Objects;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -40,13 +41,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
- * Simple Kafka Source to transfer messages from a Kafka topic
+ * Simple Kafka Source to transfer messages from a Kafka topic.
*/
public abstract class KafkaAbstractSource<V> extends PushSource<V> {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaAbstractSource.class);
- private volatile Consumer<String, byte[]> consumer;
+ private volatile Consumer<Object, Object> consumer;
private volatile boolean running = false;
private KafkaSourceConfig kafkaSourceConfig;
private Thread runnerThread;
@@ -116,19 +117,20 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
LOG.info("Kafka source stopped.");
}
+ @SuppressWarnings("unchecked")
public void start() {
runnerThread = new Thread(() -> {
- LOG.info("Starting kafka source");
+ LOG.info("Starting kafka source on {}",
kafkaSourceConfig.getTopic());
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
- ConsumerRecords<String, byte[]> consumerRecords;
while (running) {
- consumerRecords = consumer.poll(1000);
+ ConsumerRecords<Object, Object> consumerRecords =
consumer.poll(1000);
CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
int index = 0;
- for (ConsumerRecord<String, byte[]> consumerRecord :
consumerRecords) {
- LOG.debug("Record received from kafka, key: {}. value:
{}", consumerRecord.key(), consumerRecord.value());
- KafkaRecord<V> record = new KafkaRecord<>(consumerRecord,
extractValue(consumerRecord));
+ for (ConsumerRecord<Object, Object> consumerRecord :
consumerRecords) {
+ KafkaRecord record = new KafkaRecord(consumerRecord,
+ extractValue(consumerRecord),
+ extractSchema(consumerRecord));
consume(record);
futures[index] = record.getCompletableFuture();
index++;
@@ -151,18 +153,25 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
runnerThread.start();
}
- public abstract V extractValue(ConsumerRecord<String, byte[]> record);
+ public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+ return consumerRecord.value();
+ }
+ public abstract Schema<V> extractSchema(ConsumerRecord<Object, Object>
consumerRecord);
+
+ @Slf4j
static private class KafkaRecord<V> implements Record<V> {
- private final ConsumerRecord<String, byte[]> record;
+ private final ConsumerRecord<String, ?> record;
private final V value;
+ private final Schema<V> schema;
+
@Getter
private final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- public KafkaRecord(ConsumerRecord<String, byte[]> record,
- V value) {
+ public KafkaRecord(ConsumerRecord<String,?> record, V value, Schema<V>
schema) {
this.record = record;
this.value = value;
+ this.schema = schema;
}
@Override
public Optional<String> getPartitionId() {
@@ -188,5 +197,10 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
public void ack() {
completableFuture.complete(null);
}
+
+ @Override
+ public Schema<V> getSchema() {
+ return schema;
+ }
}
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 59cc548..e8c42b2 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -19,12 +19,21 @@
package org.apache.pulsar.io.kafka;
-import java.util.Properties;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -39,18 +48,85 @@ import org.apache.pulsar.io.core.annotations.IOType;
configClass = KafkaSourceConfig.class
)
@Slf4j
-public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
+public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
+
+ private AvroSchemaCache schemaCache;
+
+ private static final Collection<String> SUPPORTED_KEY_DESERIALIZERS =
+
Collections.unmodifiableCollection(Arrays.asList(StringDeserializer.class.getName()));
+
+ private static final Collection<String> SUPPORTED_VALUE_DESERIALIZERS =
+
Collections.unmodifiableCollection(Arrays.asList(ByteArrayDeserializer.class.getName(),
KafkaAvroDeserializer.class.getName()));
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
log.info("Created kafka consumer config : {}", props);
+
+ String currentKeyDeserializer =
props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ if (!SUPPORTED_KEY_DESERIALIZERS.contains(currentKeyDeserializer)) {
+ throw new IllegalArgumentException("Unsupported key deserializer:
" + currentKeyDeserializer + ", only " + SUPPORTED_KEY_DESERIALIZERS);
+ }
+
+ String currentValueDeserializer =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ if (!SUPPORTED_VALUE_DESERIALIZERS.contains(currentValueDeserializer))
{
+ throw new IllegalArgumentException("Unsupported value
deserializer: " + currentValueDeserializer + ", only " +
SUPPORTED_VALUE_DESERIALIZERS);
+ }
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValueDeserializer != null &&
currentValueDeserializer.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ExtractKafkaAvroSchemaDeserializer.class.getName());
+ KafkaAvroDeserializerConfig config = new
KafkaAvroDeserializerConfig(props);
+ List<String> urls = config.getSchemaRegistryUrls();
+ int maxSchemaObject = config.getMaxSchemasPerSubject();
+ SchemaRegistryClient schemaRegistryClient = new
CachedSchemaRegistryClient(urls, maxSchemaObject);
+ schemaCache = new AvroSchemaCache(schemaRegistryClient);
+ }
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithKafkaSchema) {
+ return ((BytesWithKafkaSchema) value).getValue();
+ } else if (value instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) value);
+ } else if (value == null) {
+ return null;
+ } else {
+ throw new UnsupportedOperationException("Cannot extract a value
from a " + value.getClass());
+ }
}
-}
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<ByteBuffer>
extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithKafkaSchema) {
+ return schemaCache.get(((BytesWithKafkaSchema)
value).getSchemaId());
+ } else {
+ return Schema.BYTEBUFFER;
+ }
+ }
+
+ public static class ExtractKafkaAvroSchemaDeserializer implements
Deserializer<BytesWithKafkaSchema> {
+
+ @Override
+ public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
+ if (payload == null) {
+ return null;
+ } else {
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(payload);
+ buffer.get(); // magic number
+ int id = buffer.getInt();
+ return new BytesWithKafkaSchema(buffer, id);
+ } catch (Exception err) {
+ throw new SerializationException("Error deserializing Avro
message", err);
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index 04c7793..62c4b3c 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.kafka;
+import org.apache.pulsar.client.api.Schema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-
import java.nio.charset.StandardCharsets;
/**
@@ -28,8 +28,14 @@ import java.nio.charset.StandardCharsets;
* as Strings
*/
public class KafkaStringSource extends KafkaAbstractSource<String> {
+
+ @Override
+ public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
+ return new String((byte[]) consumerRecord.value(),
StandardCharsets.UTF_8);
+ }
+
@Override
- public String extractValue(ConsumerRecord<String, byte[]> record) {
- return new String(record.value(), StandardCharsets.UTF_8);
+ public Schema<String> extractSchema(ConsumerRecord<Object, Object>
consumerRecord) {
+ return Schema.STRING;
}
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index d5bda6d..789b3de 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -51,8 +51,13 @@ public class KafkaAbstractSourceTest {
private static class DummySource extends KafkaAbstractSource<String> {
@Override
- public String extractValue(ConsumerRecord<String, byte[]> record) {
- return new String(record.value());
+ public Object extractValue(ConsumerRecord<Object, Object>
consumerRecord) {
+ return new String((byte[]) consumerRecord.value());
+ }
+
+ @Override
+ public Schema<String> extractSchema(ConsumerRecord<Object, Object>
consumerRecord) {
+ return Schema.STRING;
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
new file mode 100644
index 0000000..9eb29f2
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.google.gson.Gson;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+
+import static org.testng.Assert.*;
+
+/**
+ * A tester for testing kafka source with Avro Messages.
+ * This test starts a PulsarCluster, a container with a Kafka Broker
+ * and a container with the SchemaRegistry.
+ * It populates a Kafka topic with Avro encoded messages with schema
+ * and then it verifies that the records are correclty received
+ * but a Pulsar Consumer
+ */
+@Slf4j
+public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
+
+ private static final String SOURCE_TYPE = "kafka";
+
+ final Duration ONE_MINUTE = Duration.ofMinutes(1);
+ final Duration TEN_SECONDS = Duration.ofSeconds(10);
+
+ final RetryPolicy statusRetryPolicy = new RetryPolicy()
+ .withMaxDuration(ONE_MINUTE)
+ .withDelay(TEN_SECONDS)
+ .onRetry(e -> log.error("Retry ... "));
+
+ private final String kafkaTopicName = "kafkasourcetopic";
+
+ private EnhancedKafkaContainer kafkaContainer;
+ private SchemaRegistryContainer schemaRegistryContainer;
+
+ protected final Map<String, Object> sourceConfig;
+ protected final String kafkaContainerName = "kafkacontainer";
+ protected final String schemaRegistryContainerName = "schemaregistry";
+
+ public AvroKafkaSourceTest() {
+ sourceConfig = new HashMap<>();
+ }
+
+ @Test(groups = "source")
+ public void test() throws Exception {
+ if (pulsarCluster == null) {
+ super.setupCluster();
+ super.setupFunctionWorkers();
+ }
+ startKafkaContainers(pulsarCluster);
+ try {
+ testSource();
+ } finally {
+ stopKafkaContainers(pulsarCluster);
+ }
+ }
+
+ private String getBootstrapServersOnDockerNetwork() {
+ return kafkaContainerName + ":9093";
+ }
+
+
+ public void startKafkaContainers(PulsarCluster cluster) throws Exception {
+ this.kafkaContainer = createKafkaContainer(cluster);
+ cluster.startService(kafkaContainerName, kafkaContainer);
+ log.info("creating schema registry kafka {}",
getBootstrapServersOnDockerNetwork());
+ this.schemaRegistryContainer = new
SchemaRegistryContainer(getBootstrapServersOnDockerNetwork());
+ cluster.startService(schemaRegistryContainerName,
schemaRegistryContainer);
+ sourceConfig.put("bootstrapServers",
getBootstrapServersOnDockerNetwork());
+ sourceConfig.put("groupId", "test-source-group");
+ sourceConfig.put("fetchMinBytes", 1L);
+ sourceConfig.put("autoCommitIntervalMs", 10L);
+ sourceConfig.put("sessionTimeoutMs", 10000L);
+ sourceConfig.put("heartbeatIntervalMs", 5000L);
+ sourceConfig.put("topic", kafkaTopicName);
+ sourceConfig.put("valueDeserializationClass",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
+ sourceConfig.put("consumerConfigProperties",
+ ImmutableMap.of("schema.registry.url",
getRegistryAddressInDockerNetwork())
+ );
+ }
+
+ private class EnhancedKafkaContainer extends KafkaContainer {
+
+ public EnhancedKafkaContainer(DockerImageName dockerImageName) {
+ super(dockerImageName);
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ // we have to override this function
+ // because we want the Kafka Broker to advertise itself
+ // with the docker network address
+ // otherwise the Kafka Schema Registry won't work
+ return "PLAINTEXT://" + kafkaContainerName + ":9093";
+ }
+
+ }
+
+ protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster
cluster) {
+ return (EnhancedKafkaContainer) new
EnhancedKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.1"))
+ .withEmbeddedZookeeper()
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(kafkaContainerName)
+ );
+ }
+
+ public void stopKafkaContainers(PulsarCluster cluster) {
+ if (null != schemaRegistryContainer) {
+ cluster.stopService(schemaRegistryContainerName,
schemaRegistryContainer);
+ }
+ if (null != kafkaContainer) {
+ cluster.stopService(kafkaContainerName, kafkaContainer);
+ }
+ }
+
+ public void prepareSource() throws Exception {
+ log.info("creating topic");
+ ExecResult execResult = kafkaContainer.execInContainer(
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ getZooKeeperAddressInDockerNetwork(),
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
+ assertTrue(
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
+
+ }
+
+ private String getZooKeeperAddressInDockerNetwork() {
+ return kafkaContainerName +":2181";
+ }
+
+ private void testSource() throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "test-source-connector-"
+ + functionRuntimeType + "-output-topic-" + randomName(8);
+ final String sourceName = "test-source-connector-"
+ + functionRuntimeType + "-name-" + randomName(8);
+ final int numMessages = 10;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ admin.topics().createNonPartitionedTopic(outputTopicName);
+
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client.newConsumer(Schema.AUTO_CONSUME())
+ .topic(outputTopicName)
+ .subscriptionName("sourcetester")
+ .subscribe();
+
+ // prepare the testing environment for source
+ prepareSource();
+
+ // submit the source connector
+ submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
+
+ // get source info
+ getSourceInfoSuccess(tenant, namespace, sourceName);
+
+ // get source status
+ Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant,
namespace, sourceName));
+
+ // produce messages
+ List<MyBean> messages = produceSourceMessages(numMessages);
+
+ // wait for source to process messages
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSourceMessages(tenant, namespace, sourceName,
numMessages));
+
+ // validate the source result
+ validateSourceResultAvro(consumer, messages);
+
+ // delete the source
+ deleteSource(tenant, namespace, sourceName);
+
+ // get source info (source should be deleted)
+ getSourceInfoNotFound(tenant, namespace, sourceName);
+ }
+
+ public void validateSourceResultAvro(Consumer<GenericRecord> consumer,
+ List<MyBean> beans) throws Exception {
+ int recordsNumber = 0;
+ Message<GenericRecord> msg = consumer.receive(10, TimeUnit.SECONDS);
+ while (msg != null) {
+ GenericRecord valueRecord = msg.getValue();
+ Assert.assertNotNull(valueRecord.getFields());
+ Assert.assertTrue(valueRecord.getFields().size() > 0);
+ for (Field field : valueRecord.getFields()) {
+ log.info("field {} value {}", field,
valueRecord.getField(field));
+ }
+ assertEquals(beans.get(recordsNumber).field,
valueRecord.getField("field"));
+ consumer.acknowledge(msg);
+ recordsNumber++;
+ msg = consumer.receive(10, TimeUnit.SECONDS);
+ }
+
+ Assert.assertEquals(recordsNumber, beans.size());
+ log.info("Stop {} server container. topic: {} has {} records.",
kafkaContainerName, consumer.getTopic(), recordsNumber);
+ }
+
+ protected void getSourceInfoSuccess(String tenant,
+ String namespace,
+ String sourceName) throws Exception {
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"archive\": \"builtin://" +
SOURCE_TYPE + "\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSourceStatus(String tenant, String namespace, String
sourceName) throws Exception {
+
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "status",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+
+ final ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+
+ assertEquals(result.getExitCode(), 0);
+
+ final SourceStatus sourceStatus =
SourceStatus.decode(result.getStdout());
+
+ assertEquals(sourceStatus.getNumInstances(), 1);
+ assertEquals(sourceStatus.getNumRunning(), 1);
+ assertEquals(sourceStatus.getInstances().size(), 1);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(),
0);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
0);
+
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+
+ }
+
+ protected void submitSourceConnector(String tenant,
+ String namespace,
+ String sourceName,
+ String outputTopicName) throws
Exception {
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName,
+ "--source-type", SOURCE_TYPE,
+ "--sourceConfig", new Gson().toJson(sourceConfig),
+ "--destinationTopicName", outputTopicName
+ };
+
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ @Data
+ public static final class MyBean {
+ private String field;
+ }
+
+ public List<MyBean> produceSourceMessages(int numMessages) throws
Exception{
+ org.apache.avro.Schema schema =
ReflectData.get().getSchema(MyBean.class);
+ String schemaDef = schema.toString(false);
+ log.info("schema {}", schemaDef);
+
+ List<MyBean> written = new ArrayList<>();
+ StringBuilder payload = new StringBuilder();
+ for (int i = 0; i < numMessages; i++) {
+ MyBean bean = new MyBean();
+ bean.setField("value" + i);
+ String serialized = serializeBeanUsingAvro(schema, bean);
+ payload.append(serialized);
+ if (i != numMessages - 1) {
+ // do not add a newline in the end of the file
+ payload.append("\n");
+ }
+ written.add(bean);
+ }
+
+ // write messages to Kafka using kafka-avro-console-producer
+ // we are writing the serialized values to the stdin of
kafka-avro-console-producer
+ // the only way to do it with TestContainers is actually to create a
bash script
+ // and execute it
+ String bashFileTemplate = "echo '"+payload+"' " +
+ "| /usr/bin/kafka-avro-console-producer " +
+ "--broker-list " + getBootstrapServersOnDockerNetwork() + " " +
+ "--property 'value.schema=" + schemaDef + "' " +
+ "--property schema.registry.url="+
getRegistryAddressInDockerNetwork() +" " +
+ "--topic "+kafkaTopicName;
+ String file = "/home/appuser/produceRecords.sh";
+
+ schemaRegistryContainer.copyFileToContainer(Transferable
+ .of(bashFileTemplate.getBytes(StandardCharsets.UTF_8),
0777), file);
+
+ ExecResult cat = schemaRegistryContainer.execInContainer("cat", file);
+ log.info("cat results: "+cat.getStdout());
+ log.info("cat stderr: "+cat.getStderr());
+
+ ExecResult execResult =
schemaRegistryContainer.execInContainer("/bin/bash", file);
+
+ log.info("script results: "+execResult.getStdout());
+ log.info("script stderr: "+execResult.getStderr());
+ assertTrue(execResult.getStdout().contains("Closing the Kafka
producer"), execResult.getStdout()+" "+execResult.getStderr());
+ assertTrue(execResult.getStderr().isEmpty(), execResult.getStderr());
+
+ log.info("Successfully produced {} messages to kafka topic {}",
numMessages, kafkaTopicName);
+ return written;
+ }
+
+ private static String serializeBeanUsingAvro(org.apache.avro.Schema
schema, MyBean bean) throws IOException {
+ DatumWriter<MyBean> userDatumWriter = new ReflectDatumWriter<>(schema);
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, stream);
+ userDatumWriter.write(bean, encoder);
+ encoder.flush();
+ String serialized = new String(stream.toByteArray(),
StandardCharsets.UTF_8);
+ return serialized;
+ }
+
+ protected void waitForProcessingSourceMessages(String tenant,
+ String namespace,
+ String sourceName,
+ int numMessages) throws
Exception {
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "status",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+
+ final ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+
+ assertEquals(result.getExitCode(), 0);
+
+ SourceStatus sourceStatus = SourceStatus.decode(result.getStdout());
+ assertEquals(sourceStatus.getNumInstances(), 1);
+ assertEquals(sourceStatus.getNumRunning(), 1);
+ assertEquals(sourceStatus.getInstances().size(), 1);
+ assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true);
+
assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime()
> 0);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(),
numMessages);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(),
numMessages);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(),
0);
+
assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
0);
+ }
+
+ protected void deleteSource(String tenant, String namespace, String
sourceName) throws Exception {
+
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Delete source successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
+
+ final String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+
+ try {
+ pulsarCluster.getAnyWorker().execCmd(commands);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " +
sourceName + " doesn't exist"));
+ }
+ }
+
+ public class SchemaRegistryContainer extends
GenericContainer<SchemaRegistryContainer> {
+ public static final String CONFLUENT_PLATFORM_VERSION = "6.0.1";
+ private static final int SCHEMA_REGISTRY_INTERNAL_PORT = 8081;
+
+ public SchemaRegistryContainer(String boostrapServers) throws
Exception {
+ super("confluentinc/cp-schema-registry:" +
CONFLUENT_PLATFORM_VERSION);
+
+ addEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
boostrapServers);
+ addEnv("SCHEMA_REGISTRY_HOST_NAME", schemaRegistryContainerName);
+
+ withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT);
+ withLogConsumer(o -> {
+ log.info("schemaregistry> {}", o.getUtf8String());
+ });
+ waitingFor(Wait.forHttp("/subjects"));
+ }
+ }
+
+ private String getRegistryAddressInDockerNetwork() {
+ return "http://"+schemaRegistryContainerName + ":8081";
+ }
+
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml
b/tests/integration/src/test/resources/pulsar-messaging.xml
index 4af0a33..14f9b04 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -24,6 +24,7 @@
<classes>
<class
name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest"
/>
<class
name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest"
/>
+ <class
name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
</classes>
</test>
</suite>
\ No newline at end of file