This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ce2e51b1c16e6266b2e22c2e1b21c70057795a2
Author: David Kjerrumgaard <[email protected]>
AuthorDate: Fri Mar 19 17:12:29 2021 -0700

    [Issue-9926][Pulsar Functions] Pass through record properties from Pulsar 
Sources (#9943)
    
    Co-authored-by: David Kjerrumgaard <[email protected]>
    (cherry picked from commit 1273c713ce7ca2dec83e5602619f38b8303ca7a4)
---
 .../pulsar/functions/utils/SourceConfigUtils.java  |   2 +
 .../tests/integration/io/TestPropertySource.java   |  63 ++++++++
 .../integration/io/PulsarSourcePropertyTest.java   | 170 +++++++++++++++++++++
 .../src/test/resources/pulsar-function-state.xml   |   1 +
 4 files changed, 236 insertions(+)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 08b561b..904bf15 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -150,6 +150,8 @@ public class SourceConfigUtils {
             
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
         }
 
+        sinkSpecBuilder.setForwardSourceMessageProperty(true);
+
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
         // use default resources if resources not set
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
new file mode 100644
index 0000000..245929b
--- /dev/null
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class TestPropertySource implements Source<String> {
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+    }
+
+    @Override
+    public Record<String> read() throws Exception {
+        Thread.sleep(50);
+        return new Record<String>() {
+            @Override
+            public Optional<String> getKey() {
+                return Optional.empty();
+            }
+
+            @Override
+            public String getValue() {
+                return "property";
+            }
+            @Override
+            public Map<String, String> getProperties() {
+                HashMap<String, String> props = new HashMap<String, String>();
+                props.put("hello", "world");
+                props.put("foo", "bar");
+                return props;
+            }
+        };
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
new file mode 100644
index 0000000..70452d4
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static 
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Source Property related test cases.
+ */
+@Slf4j
+public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite {
+    @Test(groups = {"source"})
+    public void testSourceProperty() throws Exception {
+        String outputTopicName = "test-source-property-input-" + randomName(8);
+        String sourceName = "test-source-property-" + randomName(8);
+        submitSourceConnector(sourceName, outputTopicName, 
"org.apache.pulsar.tests.integration.io.TestPropertySource",  JAVAJAR);
+
+        // get source info
+        getSourceInfoSuccess(sourceName);
+
+        // get source status
+        getSourceStatus(sourceName);
+
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                SourceStatus status = 
admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten 
> 0);
+            });
+        }
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(outputTopicName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals(msg.getValue(), "property");
+            assertEquals(msg.getProperty("hello"), "world");
+            assertEquals(msg.getProperty("foo"), "bar");
+        }
+
+        // delete source
+        deleteSource(sourceName);
+
+        getSourceInfoNotFound(sourceName);
+    }
+
+    private void submitSourceConnector(String sourceName,
+                                       String outputTopicName,
+                                       String className,
+                                       String archive) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources", "create",
+                "--name", sourceName,
+                "--destinationTopicName", outputTopicName,
+                "--archive", archive,
+                "--classname", className
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("\"Created successfully\""),
+                result.getStdout());
+    }
+
+    private void getSourceInfoSuccess(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + 
"\""));
+    }
+
+    private void getSourceStatus(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
+    private void deleteSource(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("Delete source successfully"));
+        result.assertNoStderr();
+    }
+
+    private void getSourceInfoNotFound(String sourceName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sources",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", sourceName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + 
sourceName + " doesn't exist"));
+        }
+    }
+}
+
diff --git a/tests/integration/src/test/resources/pulsar-function-state.xml 
b/tests/integration/src/test/resources/pulsar-function-state.xml
index 56ff01b..ebf3392 100644
--- a/tests/integration/src/test/resources/pulsar-function-state.xml
+++ b/tests/integration/src/test/resources/pulsar-function-state.xml
@@ -23,6 +23,7 @@
     <test name="pulsar-function-state-test-suite" preserve-order="true" >
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
+            <class 
name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest"/>
         </classes>
     </test>
 </suite>

Reply via email to