This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a4ca8f4 Revert "[fix #9851] Add forwardSourceMessageProperty to
SourceConfig (#9907)" (#9945)
a4ca8f4 is described below
commit a4ca8f4b4b5fdd034f4bba2c17a0e40a6c33394b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Mar 18 15:13:58 2021 -0700
Revert "[fix #9851] Add forwardSourceMessageProperty to SourceConfig
(#9907)" (#9945)
This reverts commit d0249e5695c75d8d65a4180fd599b17971368f33.
Co-authored-by: Jerry Peng <[email protected]>
---
.../org/apache/pulsar/admin/cli/CmdSources.java | 6 -
.../apache/pulsar/admin/cli/TestCmdSources.java | 13 +-
.../org/apache/pulsar/common/io/SourceConfig.java | 2 -
.../pulsar/functions/utils/SourceConfigUtils.java | 11 --
.../functions/utils/SourceConfigUtilsTest.java | 1 -
.../tests/integration/io/TestPropertySource.java | 63 --------
.../integration/io/PulsarSourcePropertyTest.java | 169 ---------------------
.../src/test/resources/pulsar-function.xml | 1 -
8 files changed, 2 insertions(+), 264 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index a6daf26..7f580f9 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -315,8 +315,6 @@ public class CmdSources extends CmdBase {
protected String sourceConfigString;
@Parameter(names = "--custom-runtime-options", description = "A string
that encodes options to customize the runtime, see docs for configured runtime
for details")
protected String customRuntimeOptions;
- @Parameter(names = "--forward-source-message-property", description =
"Forwarding input message's properties to output topic when processing")
- protected Boolean forwardSourceMessageProperty = true;
protected SourceConfig sourceConfig;
@@ -421,10 +419,6 @@ public class CmdSources extends CmdBase {
if (customRuntimeOptions != null) {
sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
}
-
- if (null != forwardSourceMessageProperty) {
-
sourceConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
- }
// check if source configs are valid
validateSourceConfigs(sourceConfig);
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 8a79c0a..4e6d95a 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -73,7 +73,6 @@ public class TestCmdSources {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon
Jul 02 00:33:15 +0000 2018\"}";
- private static final boolean FORWARD_PROPERTIES = true;
private PulsarAdmin pulsarAdmin;
private Sources source;
@@ -115,7 +114,6 @@ public class TestCmdSources {
sourceConfig.setArchive(JAR_FILE_PATH);
sourceConfig.setResources(new Resources(CPU, RAM, DISK));
sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
- sourceConfig.setForwardSourceMessageProperty(FORWARD_PROPERTIES);
return sourceConfig;
}
@@ -577,19 +575,15 @@ public class TestCmdSources {
updateSource.archive = "new-archive";
- updateSource.forwardSourceMessageProperty = true;
-
updateSource.processArguments();
updateSource.runCmd();
-
verify(source).updateSource(eq(SourceConfig.builder()
.tenant(PUBLIC_TENANT)
.namespace(DEFAULT_NAMESPACE)
.name(updateSource.name)
.archive(updateSource.archive)
- .forwardSourceMessageProperty(true)
.build()), eq(updateSource.archive), eq(new UpdateOptions()));
@@ -597,12 +591,10 @@ public class TestCmdSources {
updateSource.parallelism = 2;
- updateSource.updateAuthData = true;
-
- updateSource.forwardSourceMessageProperty = false;
-
updateSource.processArguments();
+ updateSource.updateAuthData = true;
+
UpdateOptions updateOptions = new UpdateOptions();
updateOptions.setUpdateAuthData(true);
@@ -613,7 +605,6 @@ public class TestCmdSources {
.namespace(DEFAULT_NAMESPACE)
.name(updateSource.name)
.parallelism(2)
- .forwardSourceMessageProperty(false)
.build()), eq(null), eq(updateOptions));
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index ccd4d14..79a5f81 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -71,6 +71,4 @@ public class SourceConfig {
private BatchSourceConfig batchSourceConfig;
// batchBuilder provides two types of batch construction methods, DEFAULT
and KEY_BASED
private String batchBuilder;
-
- private Boolean forwardSourceMessageProperty;
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index e658f2a..b9825cc 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -148,12 +148,6 @@ public class SourceConfigUtils {
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}
- if (sourceConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
-
sinkSpecBuilder.setForwardSourceMessageProperty(sourceConfig.getForwardSourceMessageProperty());
- } else {
- sinkSpecBuilder.setForwardSourceMessageProperty(false);
- }
-
functionDetailsBuilder.setSink(sinkSpecBuilder);
// use default resources if resources not set
@@ -242,8 +236,6 @@ public class SourceConfigUtils {
sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
}
-
sourceConfig.setForwardSourceMessageProperty(sinkSpec.getForwardSourceMessageProperty());
-
return sourceConfig;
}
@@ -400,9 +392,6 @@ public class SourceConfigUtils {
validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(),
newConfig.getBatchSourceConfig());
mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
}
- if (newConfig.getForwardSourceMessageProperty() != null) {
-
mergedConfig.setForwardSourceMessageProperty(newConfig.getForwardSourceMessageProperty());
- }
return mergedConfig;
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index c2dc029..20a64f8 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -358,7 +358,6 @@ public class SourceConfigUtilsTest extends
PowerMockTestCase {
sourceConfig.setParallelism(1);
sourceConfig.setRuntimeFlags("-DKerberos");
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- sourceConfig.setForwardSourceMessageProperty(true);
Map<String, String> consumerConfigs = new HashMap<>();
consumerConfigs.put("security.protocal", "SASL_PLAINTEXT");
diff --git
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
deleted file mode 100644
index 245929b..0000000
---
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestPropertySource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-public class TestPropertySource implements Source<String> {
-
- @Override
- public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- }
-
- @Override
- public Record<String> read() throws Exception {
- Thread.sleep(50);
- return new Record<String>() {
- @Override
- public Optional<String> getKey() {
- return Optional.empty();
- }
-
- @Override
- public String getValue() {
- return "property";
- }
- @Override
- public Map<String, String> getProperties() {
- HashMap<String, String> props = new HashMap<String, String>();
- props.put("hello", "world");
- props.put("foo", "bar");
- return props;
- }
- };
- }
-
- @Override
- public void close() throws Exception {
-
- }
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
deleted file mode 100644
index 04f97a1..0000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarSourcePropertyTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.policies.data.SinkStatus;
-import org.apache.pulsar.common.policies.data.SourceStatus;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.awaitility.Awaitility;
-import org.testng.annotations.Test;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
-import static
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-/**
- * Source Property related test cases.
- */
-@Slf4j
-public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite {
- @Test(groups = {"source"})
- public void testSourceProperty() throws Exception {
- String outputTopicName = "test-source-property-input-" + randomName(8);
- String sourceName = "test-source-property-" + randomName(8);
- submitSourceConnector(sourceName, outputTopicName,
"org.apache.pulsar.tests.integration.io.TestPropertySource", JAVAJAR);
-
- // get source info
- getSourceInfoSuccess(sourceName);
-
- // get source status
- getSourceStatus(sourceName);
-
- try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-
- Awaitility.await().ignoreExceptions().untilAsserted(() -> {
- SourceStatus status =
admin.sources().getSourceStatus("public", "default", sourceName);
- assertEquals(status.getInstances().size(), 1);
- assertTrue(status.getInstances().get(0).getStatus().numWritten
> 0);
- });
- }
-
- @Cleanup PulsarClient client = PulsarClient.builder()
- .serviceUrl(container.getPlainTextServiceUrl())
- .build();
- @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(outputTopicName)
- .subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName("test-sub")
- .subscribe();
-
- for (int i = 0; i < 10; i++) {
- Message<String> msg = consumer.receive();
- assertEquals(msg.getValue(), "property");
- assertEquals(msg.getProperty("hello"), "world");
- assertEquals(msg.getProperty("foo"), "bar");
- }
-
- // delete source
- deleteSource(sourceName);
-
- getSourceInfoNotFound(sourceName);
- }
-
- private void submitSourceConnector(String sourceName,
- String outputTopicName,
- String className,
- String archive) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "sources", "create",
- "--name", sourceName,
- "--destinationTopicName", outputTopicName,
- "--archive", archive,
- "--classname", className
- };
- log.info("Run command : {}", StringUtils.join(commands, ' '));
- ContainerExecResult result = container.execCmd(commands);
- assertTrue(
- result.getStdout().contains("\"Created successfully\""),
- result.getStdout());
- }
-
- private void getSourceInfoSuccess(String sourceName) throws Exception {
- ContainerExecResult result = container.execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "sources",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", sourceName
- );
- assertTrue(result.getStdout().contains("\"name\": \"" + sourceName +
"\""));
- }
-
- private void getSourceStatus(String sourceName) throws Exception {
- ContainerExecResult result = container.execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "sources",
- "status",
- "--tenant", "public",
- "--namespace", "default",
- "--name", sourceName
- );
- assertTrue(result.getStdout().contains("\"running\" : true"));
- }
-
- private void deleteSource(String sourceName) throws Exception {
- ContainerExecResult result = container.execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "sources",
- "delete",
- "--tenant", "public",
- "--namespace", "default",
- "--name", sourceName
- );
- assertTrue(result.getStdout().contains("Delete source successfully"));
- result.assertNoStderr();
- }
-
- private void getSourceInfoNotFound(String sourceName) throws Exception {
- try {
- container.execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "sources",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", sourceName);
- fail("Command should have exited with non-zero");
- } catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Source " +
sourceName + " doesn't exist"));
- }
- }
-}
diff --git a/tests/integration/src/test/resources/pulsar-function.xml
b/tests/integration/src/test/resources/pulsar-function.xml
index eeda827..026b77f 100644
--- a/tests/integration/src/test/resources/pulsar-function.xml
+++ b/tests/integration/src/test/resources/pulsar-function.xml
@@ -24,7 +24,6 @@
<classes>
<class
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
<class
name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
- <class
name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest" />
</classes>
</test>
</suite>