This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5fbb89   allow users to update output topics for functions and 
sources (#4092)
e5fbb89 is described below

commit e5fbb8974f11bee0805853c294f988cc3f72864d
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Apr 19 22:31:23 2019 -0700

     allow users to update output topics for functions and sources (#4092)
    
    * allow users to update output topics for functions and sources
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 90 ++++++++++++++++------
 .../functions/utils/FunctionConfigUtils.java       |  6 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  4 +-
 .../functions/utils/FunctionConfigUtilsTest.java   |  7 --
 .../functions/utils/SourceConfigUtilsTest.java     |  7 --
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 39 ++++------
 .../rest/api/v3/FunctionApiV3ResourceTest.java     | 39 ++++------
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 35 ++++-----
 8 files changed, 120 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index cd7044c..313a5b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -398,9 +399,10 @@ public class PulsarFunctionE2ETest {
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
         final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
         final String propertyKey = "key";
         final String propertyValue = "value";
-        final String functionName = "PulsarSink-test";
+        final String functionName = "PulsarFunction-test";
         final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
@@ -408,18 +410,38 @@ public class PulsarFunctionE2ETest {
 
         // create a producer that creates a topic at broker
         Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
-        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();
 
         FunctionConfig functionConfig = createFunctionConfig(tenant, 
namespacePortion, functionName,
                 "my.*", sinkTopic, subscriptionName);
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
         functionConfig.setParallelism(2);
+        functionConfig.setOutput(sinkTopic2);
         admin.functions().updateFunctionWithUrl(functionConfig, 
jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
+                TopicStats topicStats = admin.topics().getStats(sinkTopic2);
+                return topicStats.publishers.size() == 2
+                        && topicStats.publishers.get(0).metadata != null
+                        && 
topicStats.publishers.get(0).metadata.containsKey("id")
+                        && 
topicStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s",
 tenant, namespacePortion, functionName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats topicStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(topicStats.publishers.size(), 2);
+        assertTrue(topicStats.publishers.get(0).metadata != null);
+        assertTrue(topicStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(topicStats.publishers.get(0).metadata.get("id"), 
String.format("%s/%s/%s", tenant, namespacePortion, functionName));
+
+        retryStrategically((test) -> {
+            try {
                 return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
             } catch (PulsarAdminException e) {
                 return false;
@@ -712,12 +734,12 @@ public class PulsarFunctionE2ETest {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sinkTopic = "persistent://" + replNamespace + "/output";
-        final String functionName = "PulsarSource-test";
+        final String sourceName = "PulsarSource-test";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
 
-        SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, functionName, sinkTopic);
+        SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, sourceName, sinkTopic);
         admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
@@ -728,16 +750,36 @@ public class PulsarFunctionE2ETest {
             }
         }, 10, 150);
 
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
+        sourceConfig.setTopicName(sinkTopic2);
         admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
-                return (admin.topics().getStats(sinkTopic).publishers.size() 
== 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && 
sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && 
sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s",
 tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertTrue(sourceStats.publishers.get(0).metadata != null);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), 
String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic2).publishers.size() 
== 1) && (admin.topics().getInternalStats(sinkTopic2).numberOfEntries > 4);
             } catch (PulsarAdminException e) {
                 return false;
             }
         }, 50, 150);
-        assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+        assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
 
         String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
         log.info("prometheusMetrics: {}", prometheusMetrics);
@@ -746,65 +788,65 @@ public class PulsarFunctionE2ETest {
         Metric m = metrics.get("pulsar_source_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_source_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_source_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sourceName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
-        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
 
         // make sure all temp files are deleted
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index e245216..ae18551 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -657,9 +657,6 @@ public class FunctionConfigUtils {
                 mergedConfig.getInputSpecs().put(topicName, consumerConfig);
             });
         }
-        if (!StringUtils.isEmpty(newConfig.getOutput()) && 
!newConfig.getOutput().equals(existingConfig.getOutput())) {
-            throw new IllegalArgumentException("Output topics differ");
-        }
         if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && 
!newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
             throw new IllegalArgumentException("Output Serde mismatch");
         }
@@ -675,6 +672,9 @@ public class FunctionConfigUtils {
         if (newConfig.getRetainOrdering() != null && 
!newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
             throw new IllegalArgumentException("Retain Orderning cannot be 
altered");
         }
+        if (!StringUtils.isEmpty(newConfig.getOutput())) {
+            mergedConfig.setOutput(newConfig.getOutput());
+        }
         if (newConfig.getUserConfig() != null) {
             mergedConfig.setUserConfig(newConfig.getUserConfig());
         }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index f223583..01ea03a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -297,8 +297,8 @@ public class SourceConfigUtils {
         if (!StringUtils.isEmpty(newConfig.getClassName())) {
             mergedConfig.setClassName(newConfig.getClassName());
         }
-        if (!StringUtils.isEmpty(newConfig.getTopicName()) && 
!newConfig.getTopicName().equals(existingConfig.getTopicName())) {
-            throw new IllegalArgumentException("Destination topics differ");
+        if (!StringUtils.isEmpty(newConfig.getTopicName())) {
+            mergedConfig.setTopicName(newConfig.getTopicName());
         }
         if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
             mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index df1d69a..9c3f706 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -183,13 +183,6 @@ public class FunctionConfigUtilsTest {
         assertEquals(mergedConfig.getInputSpecs().get("test-input"), 
newFunctionConfig.getInputSpecs().get("test-input"));
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Output topics differ")
-    public void testMergeDifferentOutput() {
-        FunctionConfig functionConfig = createFunctionConfig();
-        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("output", "Different");
-        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
-    }
-
     @Test
     public void testMergeDifferentLogTopic() {
         FunctionConfig functionConfig = createFunctionConfig();
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index ccdbcdc..85911ed 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -119,13 +119,6 @@ public class SourceConfigUtilsTest {
         );
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Destination topics differ")
-    public void testMergeDifferentOutput() {
-        SourceConfig sourceConfig = createSourceConfig();
-        SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", 
"Different");
-        SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
-    }
-
     @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SourceConfig sourceConfig = createSourceConfig();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 3b0d7d9..1790fe7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -837,30 +837,25 @@ public class FunctionApiV2ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Output topics differ")
+    @Test
     public void testUpdateFunctionChangedInputs() throws Exception {
-        try {
-            mockStatic(WorkerUtils.class);
-            doNothing().when(WorkerUtils.class);
-            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
-            PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", 
any()).thenCallRealMethod();
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
+        PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", 
any()).thenCallRealMethod();
 
-            testUpdateFunctionMissingArguments(
-                    tenant,
-                    namespace,
-                    function,
-                    null,
-                    topicsToSerDeClassName,
-                    mockedFormData,
-                    "DifferentOutput",
-                    outputSerdeClassName,
-                    null,
-                    parallelism,
-                    "Output topics differ");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatusInfo(), 
Response.Status.BAD_REQUEST);
-            throw re;
-        }
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                "DifferentOutput",
+                outputSerdeClassName,
+                null,
+                parallelism,
+                null);
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 85c2f20..85cb431 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -812,30 +812,25 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Output topics differ")
+    @Test
     public void testUpdateFunctionChangedInputs() throws Exception {
-        try {
-            mockStatic(WorkerUtils.class);
-            doNothing().when(WorkerUtils.class);
-            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
-            PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", 
any()).thenCallRealMethod();
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
+        PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", 
any()).thenCallRealMethod();
 
-            testUpdateFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                null,
-                topicsToSerDeClassName,
-                mockedFormData,
-                "DifferentOutput",
-                outputSerdeClassName,
-                null,
-                parallelism,
-                "Output topics differ");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatusInfo(), 
Response.Status.BAD_REQUEST);
-            throw re;
-        }
+        testUpdateFunctionMissingArguments(
+            tenant,
+            namespace,
+            function,
+            null,
+            topicsToSerDeClassName,
+            mockedFormData,
+            "DifferentOutput",
+            outputSerdeClassName,
+            null,
+            parallelism,
+            null);
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index b2c5f82..33f0d7b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -722,28 +722,23 @@ public class SourceApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Destination topics differ")
+    @Test
     public void testUpdateSourceChangedTopic() throws Exception {
-        try {
-            mockStatic(WorkerUtils.class);
-            doNothing().when(WorkerUtils.class);
-            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.downloadFromBookkeeper(any(Namespace.class), 
any(File.class), anyString());
 
-            testUpdateSourceMissingArguments(
-                    tenant,
-                    namespace,
-                    source,
-                    null,
-                    mockedFormData,
-                    "DifferentTopic",
-                    outputSerdeClassName,
-                    className,
-                    parallelism,
-                    "Destination topics differ");
-        } catch (RestException re){
-            assertEquals(re.getResponse().getStatusInfo(), 
Response.Status.BAD_REQUEST);
-            throw re;
-        }
+        testUpdateSourceMissingArguments(
+                tenant,
+                namespace,
+                source,
+                null,
+                mockedFormData,
+                "DifferentTopic",
+                outputSerdeClassName,
+                className,
+                parallelism,
+                null);
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Source parallelism must be a positive 
number")

Reply via email to