This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ce2e51b1c16e6266b2e22c2e1b21c70057795a2 Author: David Kjerrumgaard <[email protected]> AuthorDate: Fri Mar 19 17:12:29 2021 -0700 [Issue-9926][Pulsar Functions] Pass through record properties from Pulsar Sources (#9943) Co-authored-by: David Kjerrumgaard <[email protected]> (cherry picked from commit 1273c713ce7ca2dec83e5602619f38b8303ca7a4) --- .../pulsar/functions/utils/SourceConfigUtils.java | 2 + .../tests/integration/io/TestPropertySource.java | 63 ++++++++ .../integration/io/PulsarSourcePropertyTest.java | 170 +++++++++++++++++++++ .../src/test/resources/pulsar-function-state.xml | 1 + 4 files changed, 236 insertions(+) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 08b561b..904bf15 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -150,6 +150,8 @@ public class SourceConfigUtils { sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig())); } + sinkSpecBuilder.setForwardSourceMessageProperty(true); + functionDetailsBuilder.setSink(sinkSpecBuilder); // use default resources if resources not set diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java new file mode 100644 index 0000000..245929b --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class TestPropertySource implements Source<String> { + + @Override + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { + } + + @Override + public Record<String> read() throws Exception { + Thread.sleep(50); + return new Record<String>() { + @Override + public Optional<String> getKey() { + return Optional.empty(); + } + + @Override + public String getValue() { + return "property"; + } + @Override + public Map<String, String> getProperties() { + HashMap<String, String> props = new HashMap<String, String>(); + props.put("hello", "world"); + props.put("foo", "bar"); + return props; + } + }; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java new file mode 100644 index 0000000..70452d4 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.functions.FunctionState; +import org.apache.pulsar.common.policies.data.SinkStatus; +import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; +import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; +import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/** + * Source Property related test cases. + */ +@Slf4j +public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite { + @Test(groups = {"source"}) + public void testSourceProperty() throws Exception { + String outputTopicName = "test-source-property-input-" + randomName(8); + String sourceName = "test-source-property-" + randomName(8); + submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestPropertySource", JAVAJAR); + + // get source info + getSourceInfoSuccess(sourceName); + + // get source status + getSourceStatus(sourceName); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWritten > 0); + }); + } + + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(outputTopicName) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + + for (int i = 0; i < 10; i++) { + Message<String> msg = consumer.receive(); + assertEquals(msg.getValue(), "property"); + assertEquals(msg.getProperty("hello"), "world"); + assertEquals(msg.getProperty("foo"), "bar"); + } + + // delete source + deleteSource(sourceName); + + getSourceInfoNotFound(sourceName); + } + + private void submitSourceConnector(String sourceName, + String outputTopicName, + String className, + String archive) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sources", "create", + "--name", sourceName, + "--destinationTopicName", outputTopicName, + "--archive", archive, + "--classname", className + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands); + assertTrue( + result.getStdout().contains("\"Created successfully\""), + result.getStdout()); + } + + private void getSourceInfoSuccess(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); + } + + private void getSourceStatus(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"running\" : true")); + } + + private void deleteSource(String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("Delete source successfully")); + result.assertNoStderr(); + } + + private void getSourceInfoNotFound(String sourceName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); + } + } +} + diff --git a/tests/integration/src/test/resources/pulsar-function-state.xml b/tests/integration/src/test/resources/pulsar-function-state.xml index 56ff01b..ebf3392 100644 --- a/tests/integration/src/test/resources/pulsar-function-state.xml +++ b/tests/integration/src/test/resources/pulsar-function-state.xml @@ -23,6 +23,7 @@ <test name="pulsar-function-state-test-suite" preserve-order="true" > <classes> <class name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" /> + <class name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest"/> </classes> </test> </suite>
