This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b826e03  Support writing general records to Pulsar sink (#9590)
b826e03 is described below

commit b826e035fe5c55ca7b507c4d900914a8b824d915
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Mar 13 18:41:27 2021 -0800

    Support writing general records to Pulsar sink (#9590)
---
 ...ion-state.yaml => ci-integration-function.yaml} |   4 +-
 build/run_integration_group.sh                     |   4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |  46 +++-
 .../pulsar/functions/sink/PulsarSinkTest.java      | 166 +++++++++++++++
 .../tests/integration/io/GenericRecordSource.java  |  88 ++++++++
 .../integration/io/GenericRecordSourceTest.java    | 234 +++++++++++++++++++++
 ...lsar-function-state.xml => pulsar-function.xml} |   5 +-
 tests/integration/src/test/resources/pulsar.xml    |   2 +-
 8 files changed, 531 insertions(+), 18 deletions(-)

diff --git a/.github/workflows/ci-integration-function-state.yaml 
b/.github/workflows/ci-integration-function.yaml
similarity index 96%
rename from .github/workflows/ci-integration-function-state.yaml
rename to .github/workflows/ci-integration-function.yaml
index 3150b20..ca7d79a 100644
--- a/.github/workflows/ci-integration-function-state.yaml
+++ b/.github/workflows/ci-integration-function.yaml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Integration - Function State
+name: CI - Integration - Function & IO
 on:
   pull_request:
     branches:
@@ -93,4 +93,4 @@ jobs:
 
       - name: run integration tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: ./build/run_integration_group.sh FUNCTION_STATE
\ No newline at end of file
+        run: ./build/run_integration_group.sh FUNCTION
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index ea9c455..b3fa5a0 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -67,8 +67,8 @@ test_group_cli() {
   mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-auth.xml 
-DintegrationTests
 }
 
-test_group_function_state() {
-  mvn_run_integration_test "$@" 
-DintegrationTestSuiteFile=pulsar-function-state.xml -DintegrationTests
+test_group_function() {
+  mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function.xml 
-DintegrationTests
 }
 
 test_group_messaging() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 50c8d8a..80d82a1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets;
 import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import lombok.val;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -39,12 +38,15 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.CryptoConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 import org.apache.pulsar.functions.instance.SinkRecord;
@@ -93,10 +95,10 @@ public class PulsarSink<T> implements Sink<T> {
         void close() throws Exception;
     }
 
-    private abstract class PulsarSinkProcessorBase implements 
PulsarSinkProcessor<T> {
+    abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
         protected Map<String, Producer<T>> publishProducers = new 
ConcurrentHashMap<>();
         protected Schema schema;
-        protected  Crypto crypto;
+        protected Crypto crypto;
 
         protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
             this.schema = schema;
@@ -153,11 +155,16 @@ public class PulsarSink<T> implements Sink<T> {
         protected Producer<T> getProducer(String producerId, String 
producerName, String topicName, Schema schema) {
             return publishProducers.computeIfAbsent(producerId, s -> {
                 try {
-                    return createProducer(
+                    log.info("Initializing producer {} on topic {} with schema 
{}",
+                        producerName, topicName, schema);
+                    Producer<T> producer = createProducer(
                             client,
                             topicName,
                             producerName,
                             schema != null ? schema : this.schema);
+                    log.info("Initialized producer {} on topic {} with schema 
{}: {} -> {}",
+                        producerName, topicName, schema, producerId, producer);
+                    return producer;
                 } catch (PulsarClientException e) {
                     log.error("Failed to create Producer while doing user 
publish", e);
                     throw new RuntimeException(e);
@@ -209,13 +216,21 @@ public class PulsarSink<T> implements Sink<T> {
     class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
         public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
             super(schema, crypto);
-            // initialize default topic
-            try {
-                publishProducers.put(pulsarSinkConfig.getTopic(),
+            if (!(schema instanceof AutoConsumeSchema)) {
+                // initialize default topic
+                try {
+                    publishProducers.put(pulsarSinkConfig.getTopic(),
                         createProducer(client, pulsarSinkConfig.getTopic(), 
null, schema));
-            } catch (PulsarClientException e) {
-                log.error("Failed to create Producer while doing user 
publish", e);
-                throw new RuntimeException(e);            }
+                } catch (PulsarClientException e) {
+                    log.error("Failed to create Producer while doing user 
publish", e);
+                    throw new RuntimeException(e);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("The Pulsar producer is not initialized until 
the first record is"
+                        + " published for `AUTO_CONSUME` schema.");
+                }
+            }
         }
 
         @Override
@@ -400,7 +415,16 @@ public class PulsarSink<T> implements Sink<T> {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {
+                
consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+                SchemaType configuredSchemaType = 
SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
+                if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
+                    log.info("The configured schema type {} is not able to 
write GenericRecords."
+                        + " So overwrite the schema type to be {}", 
configuredSchemaType, SchemaType.AUTO_CONSUME);
+                }
+            } else {
+                consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            }
             return (Schema<T>) 
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
                     consumerConfig, false);
         } else {
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index a61df12..8848d41 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,12 +29,14 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -51,12 +53,21 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
 import org.testng.Assert;
@@ -253,6 +264,62 @@ public class PulsarSinkTest {
     }
 
     @Test
+    public void testInitializeSchema() throws Exception {
+        PulsarClient pulsarClient = getPulsarClient();
+
+        // generic record type (no serde and no schema type)
+        PulsarSinkConfig pulsarSinkConfig = getPulsarConfigs();
+        pulsarSinkConfig.setSerdeClassName(null);
+        pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+        PulsarSink sink = new PulsarSink(
+            pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+        Schema<?> schema = sink.initializeSchema();
+        assertTrue(schema instanceof AutoConsumeSchema);
+
+        // generic record type (default serde and no schema type)
+        pulsarSinkConfig = getPulsarConfigs();
+        pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+        sink = new PulsarSink(
+            pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+        schema = sink.initializeSchema();
+        assertTrue(schema instanceof AutoConsumeSchema);
+
+        // generic record type (no serde and wrong schema type)
+        pulsarSinkConfig = getPulsarConfigs();
+        pulsarSinkConfig.setSerdeClassName(null);
+        pulsarSinkConfig.setSchemaType(SchemaType.AVRO.toString());
+        pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+        sink = new PulsarSink(
+            pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+        schema = sink.initializeSchema();
+        assertTrue(schema instanceof AutoConsumeSchema);
+
+        // generic record type (no serde and AUTO_CONSUME schema type)
+        pulsarSinkConfig = getPulsarConfigs();
+        pulsarSinkConfig.setSerdeClassName(null);
+        pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+        pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+        sink = new PulsarSink(
+            pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+        schema = sink.initializeSchema();
+        assertTrue(schema instanceof AutoConsumeSchema);
+
+        // generic record type (default serde and AUTO_CONSUME schema type)
+        pulsarSinkConfig = getPulsarConfigs();
+        pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+        pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
+        sink = new PulsarSink(
+            pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+        schema = sink.initializeSchema();
+        assertTrue(schema instanceof AutoConsumeSchema);
+    }
+
+    @Test
     public void testSinkAndMessageRouting() throws Exception {
 
         String[] topics = {"topic-1", "topic-2", "topic-3", null};
@@ -415,6 +482,105 @@ public class PulsarSinkTest {
         }
     }
 
+    @Test
+    public void testWriteGenericRecordsAtMostOnce() throws Exception {
+        testWriteGenericRecords(ProcessingGuarantees.ATMOST_ONCE);
+    }
+
+    @Test
+    public void testWriteGenericRecordsAtLeastOnce() throws Exception {
+        testWriteGenericRecords(ProcessingGuarantees.ATLEAST_ONCE);
+    }
+
+    @Test
+    public void testWriteGenericRecordsEOS() throws Exception {
+        testWriteGenericRecords(ProcessingGuarantees.EFFECTIVELY_ONCE);
+    }
+
+    private void testWriteGenericRecords(ProcessingGuarantees guarantees) 
throws Exception {
+        String defaultTopic = "default";
+
+        PulsarSinkConfig sinkConfig = getPulsarConfigs();
+        sinkConfig.setTopic(defaultTopic);
+        sinkConfig.setTypeClassName(GenericRecord.class.getName());
+        sinkConfig.setProcessingGuarantees(guarantees);
+
+        PulsarClient client = getPulsarClient();
+        PulsarSink pulsarSink = new PulsarSink(
+            client, sinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+            Thread.currentThread().getContextClassLoader());
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        if (ProcessingGuarantees.ATMOST_ONCE == guarantees) {
+            assertTrue(pulsarSink.pulsarSinkProcessor instanceof 
PulsarSink.PulsarSinkAtMostOnceProcessor);
+        } else if (ProcessingGuarantees.ATLEAST_ONCE == guarantees) {
+            assertTrue(pulsarSink.pulsarSinkProcessor instanceof 
PulsarSink.PulsarSinkAtLeastOnceProcessor);
+        } else {
+            assertTrue(pulsarSink.pulsarSinkProcessor instanceof 
PulsarSink.PulsarSinkEffectivelyOnceProcessor);
+        }
+        PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) 
pulsarSink.pulsarSinkProcessor;
+        assertFalse(processor.publishProducers.containsKey(defaultTopic));
+
+        String[] topics = { "topic-1", "topic-2", "topic-3" };
+        for (String topic : topics) {
+
+            RecordSchemaBuilder builder = SchemaBuilder.record("MyRecord");
+            builder.field("number").type(SchemaType.INT32);
+            builder.field("text").type(SchemaType.STRING);
+            GenericSchema<GenericRecord> schema = 
Schema.generic(builder.build(SchemaType.AVRO));
+
+            GenericRecordBuilder recordBuilder = schema.newRecordBuilder();
+            recordBuilder.set("number", 1);
+            recordBuilder.set("text", topic);
+
+            GenericRecord genericRecord = recordBuilder.build();
+
+            SinkRecord<GenericRecord> record = new SinkRecord<>(new 
Record<GenericRecord>() {
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    return Optional.of(topic);
+                }
+
+                @Override
+                public Schema<GenericRecord> getSchema() {
+                    return schema;
+                }
+
+                @Override
+                public GenericRecord getValue() {
+                    return genericRecord;
+                }
+
+                @Override
+                public Optional<String> getPartitionId() {
+                    return Optional.of(topic + "-id-1");
+                }
+
+                @Override
+                public Optional<Long> getRecordSequence() {
+                    return Optional.of(1L);
+                }
+            }, genericRecord);
+
+            pulsarSink.write(record);
+
+            if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) {
+                
assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", 
topic, topic)));
+            } else {
+                assertTrue(processor.publishProducers.containsKey(topic));
+            }
+            verify(client.newProducer(), times(1))
+                .topic(argThat(
+                    otherTopic -> topic != null ? topic.equals(otherTopic) : 
defaultTopic.equals(otherTopic)));
+
+            verify(client, times(1))
+                .newProducer(argThat(
+                    otherSchema -> Objects.equals(otherSchema, schema)));
+        }
+    }
+
     private Optional<String> getTopicOptional(String topic) {
         if (topic != null) {
             return Optional.of(topic);
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
new file mode 100644
index 0000000..f36b3df
--- /dev/null
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A source that generates {@link GenericRecord}s.
+ */
+@Slf4j
+public class GenericRecordSource implements Source<GenericRecord> {
+
+    private RecordSchemaBuilder recordSchemaBuilder;
+    private GenericSchema<GenericRecord> schema;
+    private List<Field> fields;
+    private AtomicInteger count = new AtomicInteger();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+        this.recordSchemaBuilder = SchemaBuilder.record("MyBean");
+        this.recordSchemaBuilder.field("number").type(SchemaType.INT32);
+        this.recordSchemaBuilder.field("text").type(SchemaType.STRING);
+        schema = 
Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO));
+        fields = Arrays.asList(new Field("number", 0),
+            new Field("text", 1));
+        log.info("created source, schema {}", new 
String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Record<GenericRecord> read() throws Exception {
+        // slow down the production of values
+        Thread.sleep(20);
+
+        int value = count.incrementAndGet();
+        GenericRecord record = schema.newRecordBuilder()
+            .set("number", value)
+            .set("text", "value-" + value)
+            .build();
+        log.info("produced {}", record);
+        return new Record<GenericRecord>() {
+            @Override
+            public GenericRecord getValue() {
+                return record;
+            }
+
+            @Override
+            public Schema<GenericRecord> getSchema() {
+                return schema;
+            }
+        };
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
new file mode 100644
index 0000000..1b591ec
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static 
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.annotations.Test;
+
+/**
+ * This tests demonstrates how a Source can create messages using 
GenericRecord API
+ * and the consumer is able to consume it as AVRO messages, with GenericRecord 
and with Java Model
+ */
+@Slf4j
+public class GenericRecordSourceTest extends PulsarStandaloneTestSuite {
+
+    @Test(groups = {"source"})
+    public void testGenericRecordSource() throws Exception {
+        String outputTopicName = "test-state-source-output-" + randomName(8);
+        String sourceName = "test-state-source-" + randomName(8);
+        int numMessages = 10;
+
+        submitSourceConnector(
+            sourceName,
+            outputTopicName,
+            "org.apache.pulsar.tests.integration.io.GenericRecordSource", 
JAVAJAR);
+
+        // get source info
+        getSourceInfoSuccess(container, sourceName);
+
+        // get source status
+        getSourceStatus(container, sourceName);
+
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            retryStrategically((test) -> {
+                try {
+                    SourceStatus status = 
admin.sources().getSourceStatus("public", "default", sourceName);
+                    return status.getInstances().size() > 0
+                        && status.getInstances().get(0).getStatus().numWritten 
>= 10;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 10, 200);
+
+            SourceStatus status = admin.sources().getSourceStatus("public", 
"default", sourceName);
+            assertEquals(status.getInstances().size(), 1);
+            assertTrue(status.getInstances().get(0).getStatus().numWritten >= 
10);
+        }
+
+        consumeMessages(container, outputTopicName, numMessages);
+
+        // delete source
+        deleteSource(container, sourceName);
+
+        getSourceInfoNotFound(container, sourceName);
+
+    }
+
+    private void submitSourceConnector(String sourceName,
+                                       String outputTopicName,
+                                       String className,
+                                       String archive) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sources", "create",
+            "--name", sourceName,
+            "--destinationTopicName", outputTopicName,
+            "--archive", archive,
+            "--classname", className
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("\"Created successfully\""),
+            result.getStdout());
+    }
+
+    private static void getSourceInfoSuccess(StandaloneContainer container, 
String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "sources",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + 
"\""));
+    }
+
+    private static void getSourceStatus(StandaloneContainer container,String 
sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "sources",
+            "status",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
+    private static void consumeMessages(StandaloneContainer container, String 
outputTopic,
+                                        int numMessages) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(container.getPlainTextServiceUrl())
+            .build();
+
+        // read using Pulsar GenericRecord abstraction
+        @Cleanup
+        Consumer<GenericRecord> consumer = 
client.newConsumer(Schema.AUTO_CONSUME())
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .startMessageIdInclusive()
+            .subscribe();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<GenericRecord> msg = consumer.receive(10, 
TimeUnit.SECONDS);
+            if (msg == null) {
+                fail("message "+i+" not received in time");
+                return;
+            }
+            log.info("received {}", msg.getValue());
+            msg.getValue().getFields().forEach( f -> {
+                log.info("field {} {}", f, msg.getValue().getField(f));
+            });
+            String text = (String) msg.getValue().getField("text");
+            int number = (Integer) msg.getValue().getField("number");
+
+            assertEquals(text, "value-" + number);
+        }
+
+        @Cleanup
+        Consumer<MyBean> typedConsumer = 
client.newConsumer(Schema.AVRO(MyBean.class))
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub-typed")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .startMessageIdInclusive()
+            .subscribe();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<MyBean> msg = typedConsumer.receive(10, TimeUnit.SECONDS);
+            if (msg == null) {
+                fail("message "+i+" not received in time");
+                return;
+            }
+            log.info("received {}", msg.getValue());
+            String text = msg.getValue().getText();
+            int number = msg.getValue().getNumber();
+            assertEquals(text, "value-" + number);
+        }
+
+    }
+
+    @Data
+    @ToString
+    public static class MyBean {
+        String text;
+        int number;
+    }
+
+    private static void deleteSource(StandaloneContainer container, String 
sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "sources",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("Delete source successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+    private static void getSourceInfoNotFound(StandaloneContainer container, 
String sourceName) throws Exception {
+        try {
+            container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + 
sourceName + " doesn't exist"));
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-function-state.xml 
b/tests/integration/src/test/resources/pulsar-function.xml
similarity index 81%
rename from tests/integration/src/test/resources/pulsar-function-state.xml
rename to tests/integration/src/test/resources/pulsar-function.xml
index 56ff01b..026b77f 100644
--- a/tests/integration/src/test/resources/pulsar-function-state.xml
+++ b/tests/integration/src/test/resources/pulsar-function.xml
@@ -19,10 +19,11 @@
 
 -->
 <!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd"; >
-<suite name="Pulsar Function State Integration Tests" verbose="2" 
annotations="JDK">
-    <test name="pulsar-function-state-test-suite" preserve-order="true" >
+<suite name="Pulsar Function Integration Tests" verbose="2" annotations="JDK">
+    <test name="pulsar-function-test-suite" preserve-order="true" >
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
+            <class 
name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
         </classes>
     </test>
 </suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml 
b/tests/integration/src/test/resources/pulsar.xml
index debfed2..ac7de1f 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -30,7 +30,7 @@
         <suite-file path="./pulsar-thread.xml" />
         <suite-file path="./tiered-jcloud-storage.xml" />
         <suite-file path="./tiered-filesystem-storage.xml"/>
-        <suite-file path="./pulsar-function-state.xml" />
+        <suite-file path="./pulsar-function.xml" />
         <suite-file path="./pulsar-messaging.xml" />
         <suite-file path="./pulsar-backwards-compatibility.xml" />
     </suite-files>

Reply via email to