This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e616217c10dcf5959a856d5643613d9d0f6a57ef
Author: labuladong <[email protected]>
AuthorDate: Tue Jun 13 16:40:54 2023 +0800

    [fix][broker] fix `Update contains no change` error when use 
`--update-auth-data` flag to update  function/sink/source (#19450)
    
    Co-authored-by: tison <[email protected]>
    (cherry picked from commit 2b92ed11051a1c51170bedab07be6acc55cd95c9)
---
 .../functions/utils/FunctionConfigUtils.java       |   4 +-
 .../pulsar/functions/utils/SinkConfigUtils.java    |   2 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |   3 +-
 .../functions/worker/rest/api/SinksImpl.java       |   3 +-
 .../functions/worker/rest/api/SourcesImpl.java     |   3 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  93 +++++++++++++
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 151 +++++++++++++++------
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 111 ++++++++++++---
 8 files changed, 306 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 15119050c57..0a0f732e948 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -754,7 +754,7 @@ public class FunctionConfigUtils {
         }
     }
 
-    private static void doCommonChecks(FunctionConfig functionConfig) {
+    public static void doCommonChecks(FunctionConfig functionConfig) {
         if (isEmpty(functionConfig.getTenant())) {
             throw new IllegalArgumentException("Function tenant cannot be 
null");
         }
@@ -896,7 +896,7 @@ public class FunctionConfigUtils {
         }
     }
 
-    private static Collection<String> collectAllInputTopics(FunctionConfig 
functionConfig) {
+    public static Collection<String> collectAllInputTopics(FunctionConfig 
functionConfig) {
         List<String> retval = new LinkedList<>();
         if (functionConfig.getInputs() != null) {
             retval.addAll(functionConfig.getInputs());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 7f974bb4b19..1cf07173ec6 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -538,7 +538,7 @@ public class SinkConfigUtils {
         return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), 
functionClassName);
     }
 
-    private static Collection<String> collectAllInputTopics(SinkConfig 
sinkConfig) {
+    public static Collection<String> collectAllInputTopics(SinkConfig 
sinkConfig) {
         List<String> retval = new LinkedList<>();
         if (sinkConfig.getInputs() != null) {
             retval.addAll(sinkConfig.getInputs());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 715d660ddff..078c47524f8 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -290,7 +290,8 @@ public class FunctionsImpl extends ComponentImpl implements 
Functions<PulsarWork
             throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
         }
 
-        if (existingFunctionConfig.equals(mergedConfig) && 
isBlank(functionPkgUrl) && uploadedInputStream == null) {
+        if (existingFunctionConfig.equals(mergedConfig) && 
isBlank(functionPkgUrl) && uploadedInputStream == null
+                && (updateOptions == null || 
!updateOptions.isUpdateAuthData())) {
             log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, functionName);
             throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 5370fe93a7a..c382ec9e01b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -295,7 +295,8 @@ public class SinksImpl extends ComponentImpl implements 
Sinks<PulsarWorkerServic
             throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
         }
 
-        if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && 
uploadedInputStream == null) {
+        if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && 
uploadedInputStream == null
+                && (updateOptions == null || 
!updateOptions.isUpdateAuthData())) {
             log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, sinkName);
             throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 2f491424d65..c55ddf48b06 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -289,7 +289,8 @@ public class SourcesImpl extends ComponentImpl implements 
Sources<PulsarWorkerSe
             throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
         }
 
-        if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) 
&& uploadedInputStream == null) {
+        if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) 
&& uploadedInputStream == null
+            && (updateOptions == null || !updateOptions.isUpdateAuthData())) {
             log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, sourceName);
             throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
         }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 337999bf2ad..0c20083bb89 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
@@ -57,6 +59,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptionsImpl;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -604,6 +607,96 @@ public class FunctionApiV3ResourceTest {
                 null, null);
     }
 
+    @Test
+    public void testUpdateSourceWithNoChange() throws ClassNotFoundException {
+        mockWorkerUtils();
+
+        FunctionDetails functionDetails = createDefaultFunctionDetails();
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(() -> 
FunctionCommon.getFunctionTypes(any(FunctionConfig.class), 
any(Class.class))).thenReturn(new Class[]{String.class, String.class});
+            ctx.when(() -> 
FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod();
+            ctx.when(() -> 
FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true);
+            ctx.when(() -> 
FunctionCommon.getClassLoaderFromPackage(any(),any(),any(),any())).thenCallRealMethod();
+            ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+        });
+
+        
doReturn(Function.class).when(mockedClassLoader).loadClass(anyString());
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        
when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+        
when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath());
+
+        
when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
+
+        // No change on config,
+        FunctionConfig funcConfig = createDefaultFunctionConfig();
+        mockStatic(FunctionConfigUtils.class, ctx -> {
+            ctx.when(() -> 
FunctionConfigUtils.convertFromDetails(any())).thenReturn(funcConfig);
+            ctx.when(() -> FunctionConfigUtils.validateUpdate(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> 
FunctionConfigUtils.convert(any(FunctionConfig.class), 
any(ClassLoader.class))).thenReturn(functionDetails);
+            ctx.when(() -> 
FunctionConfigUtils.convert(any(FunctionConfig.class), 
any(FunctionConfigUtils.ExtractedFunctionDetails.class))).thenReturn(functionDetails);
+            ctx.when(() -> FunctionConfigUtils.validateJavaFunction(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> 
FunctionConfigUtils.doCommonChecks(any())).thenCallRealMethod();
+            ctx.when(() -> 
FunctionConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
+            ctx.when(() -> FunctionConfigUtils.doJavaChecks(any(), 
any())).thenCallRealMethod();
+        });
+
+        // config has not changes and don't update auth, should fail
+        try {
+            resource.updateFunction(
+                    funcConfig.getTenant(),
+                    funcConfig.getNamespace(),
+                    funcConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    funcConfig,
+                    null,
+                    null);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        try {
+            UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+            updateOptions.setUpdateAuthData(false);
+            resource.updateFunction(
+                    funcConfig.getTenant(),
+                    funcConfig.getNamespace(),
+                    funcConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    funcConfig,
+                    null,
+                    updateOptions);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        // no changes but set the auth-update flag to true, should not fail
+        UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+        updateOptions.setUpdateAuthData(true);
+        resource.updateFunction(
+                funcConfig.getTenant(),
+                funcConfig.getNamespace(),
+                funcConfig.getName(),
+                null,
+                mockedFormData,
+                null,
+                funcConfig,
+                null,
+                updateOptions);
+    }
+
+
     private void registerDefaultFunction() {
         registerDefaultFunctionWithPackageUrl(null);
     }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 9f786c5b73a..5dcc795304e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -18,21 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
-import static 
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
-import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
@@ -60,6 +45,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptionsImpl;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -97,6 +83,23 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import static 
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
+import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 /**
  * Unit test of {@link SinksApiV3Resource}.
@@ -965,29 +968,7 @@ public class SinkApiV3ResourceTest {
             String className,
             Integer parallelism,
             String expectedError) throws Exception {
-        mockStatic(ConnectorUtils.class, ctx -> {
-            ctx.when(() -> 
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
-                    .thenReturn(CASSANDRA_STRING_SINK);
-        });
-
-        mockStatic(ClassLoaderUtils.class, ctx -> {
-        });
-
-        mockStatic(FunctionCommon.class, ctx -> {
-            ctx.when(() -> 
FunctionCommon.createPkgTempFile()).thenCallRealMethod();
-            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), 
any(), any(), any())).thenCallRealMethod();
-            ctx.when(() -> 
FunctionCommon.getSinkType(any())).thenReturn(String.class);
-            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), 
any())).thenReturn(mock(NarClassLoader.class));
-            ctx.when(() -> FunctionCommon
-                    
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
-                    .thenReturn(ATLEAST_ONCE);
-        });
-
-        this.mockedFunctionMetaData =
-                
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
-        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(mockedFunctionMetaData);
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+        mockFunctionCommon(tenant, namespace, sink);
 
         SinkConfig sinkConfig = new SinkConfig();
         if (tenant != null) {
@@ -1026,6 +1007,32 @@ public class SinkApiV3ResourceTest {
 
     }
 
+    private void mockFunctionCommon(String tenant, String namespace, String 
sink) throws IOException {
+        mockStatic(ConnectorUtils.class, ctx -> {
+            ctx.when(() -> 
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
+                    .thenReturn(CASSANDRA_STRING_SINK);
+        });
+
+        mockStatic(ClassLoaderUtils.class, ctx -> {
+        });
+
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(() -> 
FunctionCommon.createPkgTempFile()).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), 
any(), any(), any())).thenCallRealMethod();
+            ctx.when(() -> 
FunctionCommon.getSinkType(any())).thenReturn(String.class);
+            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), 
any())).thenReturn(mock(NarClassLoader.class));
+            ctx.when(() -> FunctionCommon
+                    
.convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE)))
+                    .thenReturn(ATLEAST_ONCE);
+        });
+
+        this.mockedFunctionMetaData =
+                
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(mockedFunctionMetaData);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+    }
+
     private void updateDefaultSink() throws Exception {
         updateDefaultSinkWithPackageUrl(null);
     }
@@ -1848,4 +1855,72 @@ public class SinkApiV3ResourceTest {
             }
         }
     }
+
+    @Test
+    public void testUpdateSinkWithNoChange() throws IOException {
+        mockWorkerUtils();
+
+        // No change on config,
+        SinkConfig sinkConfig = createDefaultSinkConfig();
+
+        mockStatic(SinkConfigUtils.class, ctx -> {
+            ctx.when(() -> 
SinkConfigUtils.convertFromDetails(any())).thenReturn(sinkConfig);
+            ctx.when(() -> SinkConfigUtils.convert(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> SinkConfigUtils.validateUpdate(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> SinkConfigUtils.clone(any())).thenCallRealMethod();
+            ctx.when(() -> 
SinkConfigUtils.collectAllInputTopics(any())).thenCallRealMethod();
+            ctx.when(() -> 
SinkConfigUtils.validateAndExtractDetails(any(),any(),any(),anyBoolean())).thenCallRealMethod();
+        });
+
+        mockFunctionCommon(sinkConfig.getTenant(), sinkConfig.getNamespace(), 
sinkConfig.getName());
+
+        // config has not changes and don't update auth, should fail
+        try {
+            resource.updateSink(
+                    sinkConfig.getTenant(),
+                    sinkConfig.getNamespace(),
+                    sinkConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    sinkConfig,
+                    null,
+                    null);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        try {
+            UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+            updateOptions.setUpdateAuthData(false);
+            resource.updateSink(
+                    sinkConfig.getTenant(),
+                    sinkConfig.getNamespace(),
+                    sinkConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    sinkConfig,
+                    null,
+                    updateOptions);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        // no changes but set the auth-update flag to true, should not fail
+        UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+        updateOptions.setUpdateAuthData(true);
+        resource.updateSink(
+                sinkConfig.getTenant(),
+                sinkConfig.getNamespace(),
+                sinkConfig.getName(),
+                null,
+                mockedFormData,
+                null,
+                sinkConfig,
+                null,
+                updateOptions);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 36be2de7166..eabf954cc77 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTe
 import static 
org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTest.getPulsarIOInvalidNar;
 import static 
org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTest.getPulsarIOTwitterNar;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.File;
@@ -58,6 +60,7 @@ import org.apache.pulsar.client.admin.Packages;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.UpdateOptionsImpl;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -846,6 +849,72 @@ public class SourceApiV3ResourceTest {
                 null);
     }
 
+    @Test
+    public void testUpdateSourceWithNoChange() {
+        mockWorkerUtils();
+
+        // No change on config,
+        SourceConfig sourceConfig = createDefaultSourceConfig();
+        mockStatic(SourceConfigUtils.class, ctx -> {
+            ctx.when(() -> 
SourceConfigUtils.convertFromDetails(any())).thenReturn(sourceConfig);
+            ctx.when(() -> SourceConfigUtils.convert(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> SourceConfigUtils.validateUpdate(any(), 
any())).thenCallRealMethod();
+            ctx.when(() -> 
SourceConfigUtils.clone(any())).thenCallRealMethod();
+            ctx.when(() -> 
SourceConfigUtils.validateAndExtractDetails(any(),any(),anyBoolean())).thenCallRealMethod();
+        });
+
+        mockFunctionCommon(sourceConfig.getTenant(), 
sourceConfig.getNamespace(), sourceConfig.getName());
+
+        // config has not changes and don't update auth, should fail
+        try {
+            resource.updateSource(
+                    sourceConfig.getTenant(),
+                    sourceConfig.getNamespace(),
+                    sourceConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    sourceConfig,
+                    null,
+                    null);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        try {
+            UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+            updateOptions.setUpdateAuthData(false);
+            resource.updateSource(
+                    sourceConfig.getTenant(),
+                    sourceConfig.getNamespace(),
+                    sourceConfig.getName(),
+                    null,
+                    mockedFormData,
+                    null,
+                    sourceConfig,
+                    null,
+                    updateOptions);
+            fail("Update without changes should fail");
+        } catch (RestException e) {
+            assertTrue(e.getMessage().contains("Update contains no change"));
+        }
+
+        // no changes but set the auth-update flag to true, should not fail
+        UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
+        updateOptions.setUpdateAuthData(true);
+        resource.updateSource(
+                sourceConfig.getTenant(),
+                sourceConfig.getNamespace(),
+                sourceConfig.getName(),
+                null,
+                mockedFormData,
+                null,
+                sourceConfig,
+                null,
+                updateOptions);
+    }
+
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Source parallelism must be a "
             + "positive number")
     public void testUpdateSourceZeroParallelism() throws Exception {
@@ -881,26 +950,7 @@ public class SourceApiV3ResourceTest {
             Integer parallelism,
             String expectedError) throws Exception {
 
-        mockStatic(ConnectorUtils.class, c -> {
-        });
-        mockStatic(ClassLoaderUtils.class, c -> {
-        });
-        mockStatic(FunctionCommon.class, ctx -> {
-            ctx.when(() -> 
FunctionCommon.createPkgTempFile()).thenCallRealMethod();
-            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), 
any(), any(), any())).thenCallRealMethod();
-            ctx.when(() -> FunctionCommon.getSourceType(argThat(clazz -> 
clazz.getName().equals(TWITTER_FIRE_HOSE))))
-                    .thenReturn(String.class);
-            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any()))
-                    .thenReturn(narClassLoader);
-
-
-        });
-
-        this.mockedFunctionMetaData =
-                
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
-        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
+        mockFunctionCommon(tenant, namespace, function);
 
         SourceConfig sourceConfig = new SourceConfig();
         if (tenant != null) {
@@ -942,6 +992,27 @@ public class SourceApiV3ResourceTest {
 
     }
 
+    private void mockFunctionCommon(String tenant, String namespace, String 
function) {
+        mockStatic(ConnectorUtils.class, c -> {
+        });
+        mockStatic(ClassLoaderUtils.class, c -> {
+        });
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(() -> 
FunctionCommon.createPkgTempFile()).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), 
any(), any(), any())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getSourceType(argThat(clazz -> 
clazz.getName().equals(TWITTER_FIRE_HOSE))))
+                    .thenReturn(String.class);
+            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any()))
+                    .thenReturn(narClassLoader);
+        });
+
+        this.mockedFunctionMetaData =
+                
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
+    }
+
     private void updateDefaultSource() throws Exception {
         updateDefaultSourceWithPackageUrl(null);
     }

Reply via email to