This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b826e03 Support writing general records to Pulsar sink (#9590)
b826e03 is described below
commit b826e035fe5c55ca7b507c4d900914a8b824d915
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Mar 13 18:41:27 2021 -0800
Support writing general records to Pulsar sink (#9590)
---
...ion-state.yaml => ci-integration-function.yaml} | 4 +-
build/run_integration_group.sh | 4 +-
.../apache/pulsar/functions/sink/PulsarSink.java | 46 +++-
.../pulsar/functions/sink/PulsarSinkTest.java | 166 +++++++++++++++
.../tests/integration/io/GenericRecordSource.java | 88 ++++++++
.../integration/io/GenericRecordSourceTest.java | 234 +++++++++++++++++++++
...lsar-function-state.xml => pulsar-function.xml} | 5 +-
tests/integration/src/test/resources/pulsar.xml | 2 +-
8 files changed, 531 insertions(+), 18 deletions(-)
diff --git a/.github/workflows/ci-integration-function-state.yaml
b/.github/workflows/ci-integration-function.yaml
similarity index 96%
rename from .github/workflows/ci-integration-function-state.yaml
rename to .github/workflows/ci-integration-function.yaml
index 3150b20..ca7d79a 100644
--- a/.github/workflows/ci-integration-function-state.yaml
+++ b/.github/workflows/ci-integration-function.yaml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Integration - Function State
+name: CI - Integration - Function & IO
on:
pull_request:
branches:
@@ -93,4 +93,4 @@ jobs:
- name: run integration tests
if: steps.docs.outputs.changed_only == 'no'
- run: ./build/run_integration_group.sh FUNCTION_STATE
\ No newline at end of file
+ run: ./build/run_integration_group.sh FUNCTION
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index ea9c455..b3fa5a0 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -67,8 +67,8 @@ test_group_cli() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-auth.xml
-DintegrationTests
}
-test_group_function_state() {
- mvn_run_integration_test "$@"
-DintegrationTestSuiteFile=pulsar-function-state.xml -DintegrationTests
+test_group_function() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function.xml
-DintegrationTests
}
test_group_messaging() {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 50c8d8a..80d82a1 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
@@ -39,12 +38,15 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
@@ -93,10 +95,10 @@ public class PulsarSink<T> implements Sink<T> {
void close() throws Exception;
}
- private abstract class PulsarSinkProcessorBase implements
PulsarSinkProcessor<T> {
+ abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
protected Map<String, Producer<T>> publishProducers = new
ConcurrentHashMap<>();
protected Schema schema;
- protected Crypto crypto;
+ protected Crypto crypto;
protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
this.schema = schema;
@@ -153,11 +155,16 @@ public class PulsarSink<T> implements Sink<T> {
protected Producer<T> getProducer(String producerId, String
producerName, String topicName, Schema schema) {
return publishProducers.computeIfAbsent(producerId, s -> {
try {
- return createProducer(
+ log.info("Initializing producer {} on topic {} with schema
{}",
+ producerName, topicName, schema);
+ Producer<T> producer = createProducer(
client,
topicName,
producerName,
schema != null ? schema : this.schema);
+ log.info("Initialized producer {} on topic {} with schema
{}: {} -> {}",
+ producerName, topicName, schema, producerId, producer);
+ return producer;
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user
publish", e);
throw new RuntimeException(e);
@@ -209,13 +216,21 @@ public class PulsarSink<T> implements Sink<T> {
class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
- // initialize default topic
- try {
- publishProducers.put(pulsarSinkConfig.getTopic(),
+ if (!(schema instanceof AutoConsumeSchema)) {
+ // initialize default topic
+ try {
+ publishProducers.put(pulsarSinkConfig.getTopic(),
createProducer(client, pulsarSinkConfig.getTopic(),
null, schema));
- } catch (PulsarClientException e) {
- log.error("Failed to create Producer while doing user
publish", e);
- throw new RuntimeException(e); }
+ } catch (PulsarClientException e) {
+ log.error("Failed to create Producer while doing user
publish", e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("The Pulsar producer is not initialized until
the first record is"
+ + " published for `AUTO_CONSUME` schema.");
+ }
+ }
}
@Override
@@ -400,7 +415,16 @@ public class PulsarSink<T> implements Sink<T> {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
- consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+ if (GenericRecord.class.isAssignableFrom(typeArg)) {
+
consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+ SchemaType configuredSchemaType =
SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
+ if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
+ log.info("The configured schema type {} is not able to
write GenericRecords."
+ + " So overwrite the schema type to be {}",
configuredSchemaType, SchemaType.AUTO_CONSUME);
+ }
+ } else {
+ consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+ }
return (Schema<T>)
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
consumerConfig, false);
} else {
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index a61df12..8848d41 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,12 +29,14 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -51,12 +53,21 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
import org.testng.Assert;
@@ -253,6 +264,62 @@ public class PulsarSinkTest {
}
@Test
+ public void testInitializeSchema() throws Exception {
+ PulsarClient pulsarClient = getPulsarClient();
+
+ // generic record type (no serde and no schema type)
+ PulsarSinkConfig pulsarSinkConfig = getPulsarConfigs();
+ pulsarSinkConfig.setSerdeClassName(null);
+ pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+ PulsarSink sink = new PulsarSink(
+ pulsarClient, pulsarSinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+ Schema<?> schema = sink.initializeSchema();
+ assertTrue(schema instanceof AutoConsumeSchema);
+
+ // generic record type (default serde and no schema type)
+ pulsarSinkConfig = getPulsarConfigs();
+ pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+ sink = new PulsarSink(
+ pulsarClient, pulsarSinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+ schema = sink.initializeSchema();
+ assertTrue(schema instanceof AutoConsumeSchema);
+
+ // generic record type (no serde and wrong schema type)
+ pulsarSinkConfig = getPulsarConfigs();
+ pulsarSinkConfig.setSerdeClassName(null);
+ pulsarSinkConfig.setSchemaType(SchemaType.AVRO.toString());
+ pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+ sink = new PulsarSink(
+ pulsarClient, pulsarSinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+ schema = sink.initializeSchema();
+ assertTrue(schema instanceof AutoConsumeSchema);
+
+ // generic record type (no serde and AUTO_CONSUME schema type)
+ pulsarSinkConfig = getPulsarConfigs();
+ pulsarSinkConfig.setSerdeClassName(null);
+ pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+ pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+ sink = new PulsarSink(
+ pulsarClient, pulsarSinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+ schema = sink.initializeSchema();
+ assertTrue(schema instanceof AutoConsumeSchema);
+
+ // generic record type (default serde and AUTO_CONSUME schema type)
+ pulsarSinkConfig = getPulsarConfigs();
+ pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+ pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+ sink = new PulsarSink(
+ pulsarClient, pulsarSinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+ schema = sink.initializeSchema();
+ assertTrue(schema instanceof AutoConsumeSchema);
+ }
+
+ @Test
public void testSinkAndMessageRouting() throws Exception {
String[] topics = {"topic-1", "topic-2", "topic-3", null};
@@ -415,6 +482,105 @@ public class PulsarSinkTest {
}
}
+ @Test
+ public void testWriteGenericRecordsAtMostOnce() throws Exception {
+ testWriteGenericRecords(ProcessingGuarantees.ATMOST_ONCE);
+ }
+
+ @Test
+ public void testWriteGenericRecordsAtLeastOnce() throws Exception {
+ testWriteGenericRecords(ProcessingGuarantees.ATLEAST_ONCE);
+ }
+
+ @Test
+ public void testWriteGenericRecordsEOS() throws Exception {
+ testWriteGenericRecords(ProcessingGuarantees.EFFECTIVELY_ONCE);
+ }
+
+ private void testWriteGenericRecords(ProcessingGuarantees guarantees)
throws Exception {
+ String defaultTopic = "default";
+
+ PulsarSinkConfig sinkConfig = getPulsarConfigs();
+ sinkConfig.setTopic(defaultTopic);
+ sinkConfig.setTypeClassName(GenericRecord.class.getName());
+ sinkConfig.setProcessingGuarantees(guarantees);
+
+ PulsarClient client = getPulsarClient();
+ PulsarSink pulsarSink = new PulsarSink(
+ client, sinkConfig, new HashMap<>(),
mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
+
+ pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+ if (ProcessingGuarantees.ATMOST_ONCE == guarantees) {
+ assertTrue(pulsarSink.pulsarSinkProcessor instanceof
PulsarSink.PulsarSinkAtMostOnceProcessor);
+ } else if (ProcessingGuarantees.ATLEAST_ONCE == guarantees) {
+ assertTrue(pulsarSink.pulsarSinkProcessor instanceof
PulsarSink.PulsarSinkAtLeastOnceProcessor);
+ } else {
+ assertTrue(pulsarSink.pulsarSinkProcessor instanceof
PulsarSink.PulsarSinkEffectivelyOnceProcessor);
+ }
+ PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase)
pulsarSink.pulsarSinkProcessor;
+ assertFalse(processor.publishProducers.containsKey(defaultTopic));
+
+ String[] topics = { "topic-1", "topic-2", "topic-3" };
+ for (String topic : topics) {
+
+ RecordSchemaBuilder builder = SchemaBuilder.record("MyRecord");
+ builder.field("number").type(SchemaType.INT32);
+ builder.field("text").type(SchemaType.STRING);
+ GenericSchema<GenericRecord> schema =
Schema.generic(builder.build(SchemaType.AVRO));
+
+ GenericRecordBuilder recordBuilder = schema.newRecordBuilder();
+ recordBuilder.set("number", 1);
+ recordBuilder.set("text", topic);
+
+ GenericRecord genericRecord = recordBuilder.build();
+
+ SinkRecord<GenericRecord> record = new SinkRecord<>(new
Record<GenericRecord>() {
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ return Optional.of(topic);
+ }
+
+ @Override
+ public Schema<GenericRecord> getSchema() {
+ return schema;
+ }
+
+ @Override
+ public GenericRecord getValue() {
+ return genericRecord;
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return Optional.of(topic + "-id-1");
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(1L);
+ }
+ }, genericRecord);
+
+ pulsarSink.write(record);
+
+ if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) {
+
assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1",
topic, topic)));
+ } else {
+ assertTrue(processor.publishProducers.containsKey(topic));
+ }
+ verify(client.newProducer(), times(1))
+ .topic(argThat(
+ otherTopic -> topic != null ? topic.equals(otherTopic) :
defaultTopic.equals(otherTopic)));
+
+ verify(client, times(1))
+ .newProducer(argThat(
+ otherSchema -> Objects.equals(otherSchema, schema)));
+ }
+ }
+
private Optional<String> getTopicOptional(String topic) {
if (topic != null) {
return Optional.of(topic);
diff --git
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
new file mode 100644
index 0000000..f36b3df
--- /dev/null
+++
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A source that generates {@link GenericRecord}s.
+ */
+@Slf4j
+public class GenericRecordSource implements Source<GenericRecord> {
+
+ private RecordSchemaBuilder recordSchemaBuilder;
+ private GenericSchema<GenericRecord> schema;
+ private List<Field> fields;
+ private AtomicInteger count = new AtomicInteger();
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
+ this.recordSchemaBuilder = SchemaBuilder.record("MyBean");
+ this.recordSchemaBuilder.field("number").type(SchemaType.INT32);
+ this.recordSchemaBuilder.field("text").type(SchemaType.STRING);
+ schema =
Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO));
+ fields = Arrays.asList(new Field("number", 0),
+ new Field("text", 1));
+ log.info("created source, schema {}", new
String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public Record<GenericRecord> read() throws Exception {
+ // slow down the production of values
+ Thread.sleep(20);
+
+ int value = count.incrementAndGet();
+ GenericRecord record = schema.newRecordBuilder()
+ .set("number", value)
+ .set("text", "value-" + value)
+ .build();
+ log.info("produced {}", record);
+ return new Record<GenericRecord>() {
+ @Override
+ public GenericRecord getValue() {
+ return record;
+ }
+
+ @Override
+ public Schema<GenericRecord> getSchema() {
+ return schema;
+ }
+ };
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
new file mode 100644
index 0000000..1b591ec
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.annotations.Test;
+
+/**
+ * This tests demonstrates how a Source can create messages using
GenericRecord API
+ * and the consumer is able to consume it as AVRO messages, with GenericRecord
and with Java Model
+ */
+@Slf4j
+public class GenericRecordSourceTest extends PulsarStandaloneTestSuite {
+
+ @Test(groups = {"source"})
+ public void testGenericRecordSource() throws Exception {
+ String outputTopicName = "test-state-source-output-" + randomName(8);
+ String sourceName = "test-state-source-" + randomName(8);
+ int numMessages = 10;
+
+ submitSourceConnector(
+ sourceName,
+ outputTopicName,
+ "org.apache.pulsar.tests.integration.io.GenericRecordSource",
JAVAJAR);
+
+ // get source info
+ getSourceInfoSuccess(container, sourceName);
+
+ // get source status
+ getSourceStatus(container, sourceName);
+
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ retryStrategically((test) -> {
+ try {
+ SourceStatus status =
admin.sources().getSourceStatus("public", "default", sourceName);
+ return status.getInstances().size() > 0
+ && status.getInstances().get(0).getStatus().numWritten
>= 10;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ SourceStatus status = admin.sources().getSourceStatus("public",
"default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten >=
10);
+ }
+
+ consumeMessages(container, outputTopicName, numMessages);
+
+ // delete source
+ deleteSource(container, sourceName);
+
+ getSourceInfoNotFound(container, sourceName);
+
+ }
+
+ private void submitSourceConnector(String sourceName,
+ String outputTopicName,
+ String className,
+ String archive) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources", "create",
+ "--name", sourceName,
+ "--destinationTopicName", outputTopicName,
+ "--archive", archive,
+ "--classname", className
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ private static void getSourceInfoSuccess(StandaloneContainer container,
String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sourceName +
"\""));
+ }
+
+ private static void getSourceStatus(StandaloneContainer container,String
sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
+ private static void consumeMessages(StandaloneContainer container, String
outputTopic,
+ int numMessages) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ // read using Pulsar GenericRecord abstraction
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client.newConsumer(Schema.AUTO_CONSUME())
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> msg = consumer.receive(10,
TimeUnit.SECONDS);
+ if (msg == null) {
+ fail("message "+i+" not received in time");
+ return;
+ }
+ log.info("received {}", msg.getValue());
+ msg.getValue().getFields().forEach( f -> {
+ log.info("field {} {}", f, msg.getValue().getField(f));
+ });
+ String text = (String) msg.getValue().getField("text");
+ int number = (Integer) msg.getValue().getField("number");
+
+ assertEquals(text, "value-" + number);
+ }
+
+ @Cleanup
+ Consumer<MyBean> typedConsumer =
client.newConsumer(Schema.AVRO(MyBean.class))
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub-typed")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<MyBean> msg = typedConsumer.receive(10, TimeUnit.SECONDS);
+ if (msg == null) {
+ fail("message "+i+" not received in time");
+ return;
+ }
+ log.info("received {}", msg.getValue());
+ String text = msg.getValue().getText();
+ int number = msg.getValue().getNumber();
+ assertEquals(text, "value-" + number);
+ }
+
+ }
+
+ @Data
+ @ToString
+ public static class MyBean {
+ String text;
+ int number;
+ }
+
+ private static void deleteSource(StandaloneContainer container, String
sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("Delete source successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
+ private static void getSourceInfoNotFound(StandaloneContainer container,
String sourceName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " +
sourceName + " doesn't exist"));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-function-state.xml
b/tests/integration/src/test/resources/pulsar-function.xml
similarity index 81%
rename from tests/integration/src/test/resources/pulsar-function-state.xml
rename to tests/integration/src/test/resources/pulsar-function.xml
index 56ff01b..026b77f 100644
--- a/tests/integration/src/test/resources/pulsar-function-state.xml
+++ b/tests/integration/src/test/resources/pulsar-function.xml
@@ -19,10 +19,11 @@
-->
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
-<suite name="Pulsar Function State Integration Tests" verbose="2"
annotations="JDK">
- <test name="pulsar-function-state-test-suite" preserve-order="true" >
+<suite name="Pulsar Function Integration Tests" verbose="2" annotations="JDK">
+ <test name="pulsar-function-test-suite" preserve-order="true" >
<classes>
<class
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
+ <class
name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
</classes>
</test>
</suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml
b/tests/integration/src/test/resources/pulsar.xml
index debfed2..ac7de1f 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -30,7 +30,7 @@
<suite-file path="./pulsar-thread.xml" />
<suite-file path="./tiered-jcloud-storage.xml" />
<suite-file path="./tiered-filesystem-storage.xml"/>
- <suite-file path="./pulsar-function-state.xml" />
+ <suite-file path="./pulsar-function.xml" />
<suite-file path="./pulsar-messaging.xml" />
<suite-file path="./pulsar-backwards-compatibility.xml" />
</suite-files>