This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e5fbb89 allow users to update output topics for functions and
sources (#4092)
e5fbb89 is described below
commit e5fbb8974f11bee0805853c294f988cc3f72864d
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Apr 19 22:31:23 2019 -0700
allow users to update output topics for functions and sources (#4092)
* allow users to update output topics for functions and sources
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 90 ++++++++++++++++------
.../functions/utils/FunctionConfigUtils.java | 6 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 4 +-
.../functions/utils/FunctionConfigUtilsTest.java | 7 --
.../functions/utils/SourceConfigUtilsTest.java | 7 --
.../rest/api/v2/FunctionApiV2ResourceTest.java | 39 ++++------
.../rest/api/v3/FunctionApiV3ResourceTest.java | 39 ++++------
.../rest/api/v3/SourceApiV3ResourceTest.java | 35 ++++-----
8 files changed, 120 insertions(+), 107 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index cd7044c..313a5b5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -398,9 +399,10 @@ public class PulsarFunctionE2ETest {
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace +
"/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
final String propertyKey = "key";
final String propertyValue = "value";
- final String functionName = "PulsarSink-test";
+ final String functionName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
@@ -408,18 +410,38 @@ public class PulsarFunctionE2ETest {
// create a producer that creates a topic at broker
Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
- Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
FunctionConfig functionConfig = createFunctionConfig(tenant,
namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
+
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
admin.functions().createFunctionWithUrl(functionConfig,
jarFilePathUrl);
// try to update function to test: update-function functionality
functionConfig.setParallelism(2);
+ functionConfig.setOutput(sinkTopic2);
admin.functions().updateFunctionWithUrl(functionConfig,
jarFilePathUrl);
retryStrategically((test) -> {
try {
+ TopicStats topicStats = admin.topics().getStats(sinkTopic2);
+ return topicStats.publishers.size() == 2
+ && topicStats.publishers.get(0).metadata != null
+ &&
topicStats.publishers.get(0).metadata.containsKey("id")
+ &&
topicStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s",
tenant, namespacePortion, functionName));
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ TopicStats topicStats = admin.topics().getStats(sinkTopic2);
+ assertEquals(topicStats.publishers.size(), 2);
+ assertTrue(topicStats.publishers.get(0).metadata != null);
+ assertTrue(topicStats.publishers.get(0).metadata.containsKey("id"));
+ assertEquals(topicStats.publishers.get(0).metadata.get("id"),
String.format("%s/%s/%s", tenant, namespacePortion, functionName));
+
+ retryStrategically((test) -> {
+ try {
return
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
} catch (PulsarAdminException e) {
return false;
@@ -712,12 +734,12 @@ public class PulsarFunctionE2ETest {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
- final String functionName = "PulsarSource-test";
+ final String sourceName = "PulsarSource-test";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
- SourceConfig sourceConfig = createSourceConfig(tenant,
namespacePortion, functionName, sinkTopic);
+ SourceConfig sourceConfig = createSourceConfig(tenant,
namespacePortion, sourceName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
@@ -728,16 +750,36 @@ public class PulsarFunctionE2ETest {
}
}, 10, 150);
+ final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
+ sourceConfig.setTopicName(sinkTopic2);
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
- return (admin.topics().getStats(sinkTopic).publishers.size()
== 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+ TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+ return sourceStats.publishers.size() == 1
+ && sourceStats.publishers.get(0).metadata != null
+ &&
sourceStats.publishers.get(0).metadata.containsKey("id")
+ &&
sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s",
tenant, namespacePortion, sourceName));
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+ assertEquals(sourceStats.publishers.size(), 1);
+ assertTrue(sourceStats.publishers.get(0).metadata != null);
+ assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+ assertEquals(sourceStats.publishers.get(0).metadata.get("id"),
String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+ retryStrategically((test) -> {
+ try {
+ return (admin.topics().getStats(sinkTopic2).publishers.size()
== 1) && (admin.topics().getInternalStats(sinkTopic2).numberOfEntries > 4);
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
- assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+ assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
log.info("prometheusMetrics: {}", prometheusMetrics);
@@ -746,65 +788,65 @@ public class PulsarFunctionE2ETest {
Metric m = metrics.get("pulsar_source_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_source_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_source_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant,
namespacePortion));
- assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"),
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
// make sure all temp files are deleted
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index e245216..ae18551 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -657,9 +657,6 @@ public class FunctionConfigUtils {
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
- if (!StringUtils.isEmpty(newConfig.getOutput()) &&
!newConfig.getOutput().equals(existingConfig.getOutput())) {
- throw new IllegalArgumentException("Output topics differ");
- }
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) &&
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
@@ -675,6 +672,9 @@ public class FunctionConfigUtils {
if (newConfig.getRetainOrdering() != null &&
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Orderning cannot be
altered");
}
+ if (!StringUtils.isEmpty(newConfig.getOutput())) {
+ mergedConfig.setOutput(newConfig.getOutput());
+ }
if (newConfig.getUserConfig() != null) {
mergedConfig.setUserConfig(newConfig.getUserConfig());
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index f223583..01ea03a 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -297,8 +297,8 @@ public class SourceConfigUtils {
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
- if (!StringUtils.isEmpty(newConfig.getTopicName()) &&
!newConfig.getTopicName().equals(existingConfig.getTopicName())) {
- throw new IllegalArgumentException("Destination topics differ");
+ if (!StringUtils.isEmpty(newConfig.getTopicName())) {
+ mergedConfig.setTopicName(newConfig.getTopicName());
}
if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index df1d69a..9c3f706 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -183,13 +183,6 @@ public class FunctionConfigUtilsTest {
assertEquals(mergedConfig.getInputSpecs().get("test-input"),
newFunctionConfig.getInputSpecs().get("test-input"));
}
- @Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Output topics differ")
- public void testMergeDifferentOutput() {
- FunctionConfig functionConfig = createFunctionConfig();
- FunctionConfig newFunctionConfig =
createUpdatedFunctionConfig("output", "Different");
- FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
- }
-
@Test
public void testMergeDifferentLogTopic() {
FunctionConfig functionConfig = createFunctionConfig();
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index ccdbcdc..85911ed 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -119,13 +119,6 @@ public class SourceConfigUtilsTest {
);
}
- @Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Destination topics differ")
- public void testMergeDifferentOutput() {
- SourceConfig sourceConfig = createSourceConfig();
- SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName",
"Different");
- SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
- }
-
@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
SourceConfig sourceConfig = createSourceConfig();
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 3b0d7d9..1790fe7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -837,30 +837,25 @@ public class FunctionApiV2ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Output topics differ")
+ @Test
public void testUpdateFunctionChangedInputs() throws Exception {
- try {
- mockStatic(WorkerUtils.class);
- doNothing().when(WorkerUtils.class);
- WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
- PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
- testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- "DifferentOutput",
- outputSerdeClassName,
- null,
- parallelism,
- "Output topics differ");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatusInfo(),
Response.Status.BAD_REQUEST);
- throw re;
- }
+ testUpdateFunctionMissingArguments(
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ "DifferentOutput",
+ outputSerdeClassName,
+ null,
+ parallelism,
+ null);
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 85c2f20..85cb431 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -812,30 +812,25 @@ public class FunctionApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Output topics differ")
+ @Test
public void testUpdateFunctionChangedInputs() throws Exception {
- try {
- mockStatic(WorkerUtils.class);
- doNothing().when(WorkerUtils.class);
- WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
- PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
+ PowerMockito.when(WorkerUtils.class, "dumpToTmpFile",
any()).thenCallRealMethod();
- testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- "DifferentOutput",
- outputSerdeClassName,
- null,
- parallelism,
- "Output topics differ");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatusInfo(),
Response.Status.BAD_REQUEST);
- throw re;
- }
+ testUpdateFunctionMissingArguments(
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ "DifferentOutput",
+ outputSerdeClassName,
+ null,
+ parallelism,
+ null);
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index b2c5f82..33f0d7b 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -722,28 +722,23 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Destination topics differ")
+ @Test
public void testUpdateSourceChangedTopic() throws Exception {
- try {
- mockStatic(WorkerUtils.class);
- doNothing().when(WorkerUtils.class);
- WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
+ mockStatic(WorkerUtils.class);
+ doNothing().when(WorkerUtils.class);
+ WorkerUtils.downloadFromBookkeeper(any(Namespace.class),
any(File.class), anyString());
- testUpdateSourceMissingArguments(
- tenant,
- namespace,
- source,
- null,
- mockedFormData,
- "DifferentTopic",
- outputSerdeClassName,
- className,
- parallelism,
- "Destination topics differ");
- } catch (RestException re){
- assertEquals(re.getResponse().getStatusInfo(),
Response.Status.BAD_REQUEST);
- throw re;
- }
+ testUpdateSourceMissingArguments(
+ tenant,
+ namespace,
+ source,
+ null,
+ mockedFormData,
+ "DifferentTopic",
+ outputSerdeClassName,
+ className,
+ parallelism,
+ null);
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Source parallelism must be a positive
number")