This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e616217c10dcf5959a856d5643613d9d0f6a57ef Author: labuladong <[email protected]> AuthorDate: Tue Jun 13 16:40:54 2023 +0800 [fix][broker] fix `Update contains no change` error when use `--update-auth-data` flag to update function/sink/source (#19450) Co-authored-by: tison <[email protected]> (cherry picked from commit 2b92ed11051a1c51170bedab07be6acc55cd95c9) --- .../functions/utils/FunctionConfigUtils.java | 4 +- .../pulsar/functions/utils/SinkConfigUtils.java | 2 +- .../functions/worker/rest/api/FunctionsImpl.java | 3 +- .../functions/worker/rest/api/SinksImpl.java | 3 +- .../functions/worker/rest/api/SourcesImpl.java | 3 +- .../rest/api/v3/FunctionApiV3ResourceTest.java | 93 +++++++++++++ .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 151 +++++++++++++++------ .../rest/api/v3/SourceApiV3ResourceTest.java | 111 ++++++++++++--- 8 files changed, 306 insertions(+), 64 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 15119050c57..0a0f732e948 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -754,7 +754,7 @@ public class FunctionConfigUtils { } } - private static void doCommonChecks(FunctionConfig functionConfig) { + public static void doCommonChecks(FunctionConfig functionConfig) { if (isEmpty(functionConfig.getTenant())) { throw new IllegalArgumentException("Function tenant cannot be null"); } @@ -896,7 +896,7 @@ public class FunctionConfigUtils { } } - private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) { + public static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) { List<String> retval = new LinkedList<>(); if (functionConfig.getInputs() != null) { retval.addAll(functionConfig.getInputs()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 7f974bb4b19..1cf07173ec6 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -538,7 +538,7 @@ public class SinkConfigUtils { return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName); } - private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) { + public static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) { List<String> retval = new LinkedList<>(); if (sinkConfig.getInputs() != null) { retval.addAll(sinkConfig.getInputs()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 715d660ddff..078c47524f8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -290,7 +290,8 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork throw new RestException(Response.Status.BAD_REQUEST, e.getMessage()); } - if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null) { + if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null + && (updateOptions == null || !updateOptions.isUpdateAuthData())) { log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName); throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 5370fe93a7a..c382ec9e01b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -295,7 +295,8 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic throw new RestException(Response.Status.BAD_REQUEST, e.getMessage()); } - if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null) { + if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null + && (updateOptions == null || !updateOptions.isUpdateAuthData())) { log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName); throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 2f491424d65..c55ddf48b06 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -289,7 +289,8 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe throw new RestException(Response.Status.BAD_REQUEST, e.getMessage()); } - if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null) { + if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null + && (updateOptions == null || !updateOptions.isUpdateAuthData())) { log.error("{}/{}/{} Update contains no changes", tenant, namespace, sourceName); throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change"); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index 337999bf2ad..0c20083bb89 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; @@ -57,6 +59,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -604,6 +607,96 @@ public class FunctionApiV3ResourceTest { null, null); } + @Test + public void testUpdateSourceWithNoChange() throws ClassNotFoundException { + mockWorkerUtils(); + + FunctionDetails functionDetails = createDefaultFunctionDetails(); + NarClassLoader mockedClassLoader = mock(NarClassLoader.class); + mockStatic(FunctionCommon.class, ctx -> { + ctx.when(() -> FunctionCommon.getFunctionTypes(any(FunctionConfig.class), any(Class.class))).thenReturn(new Class[]{String.class, String.class}); + ctx.when(() -> FunctionCommon.convertRuntime(any(FunctionConfig.Runtime.class))).thenCallRealMethod(); + ctx.when(() -> FunctionCommon.isFunctionCodeBuiltin(any())).thenReturn(true); + ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(),any(),any(),any())).thenCallRealMethod(); + ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod(); + }); + + doReturn(Function.class).when(mockedClassLoader).loadClass(anyString()); + + FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class); + FunctionArchive functionArchive = FunctionArchive.builder() + .classLoader(mockedClassLoader) + .build(); + when(mockedFunctionsManager.getFunction("exclamation")).thenReturn(functionArchive); + when(mockedFunctionsManager.getFunctionArchive(any())).thenReturn(getPulsarApiExamplesNar().toPath()); + + when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager); + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + + // No change on config, + FunctionConfig funcConfig = createDefaultFunctionConfig(); + mockStatic(FunctionConfigUtils.class, ctx -> { + ctx.when(() -> FunctionConfigUtils.convertFromDetails(any())).thenReturn(funcConfig); + ctx.when(() -> FunctionConfigUtils.validateUpdate(any(), any())).thenCallRealMethod(); + ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(ClassLoader.class))).thenReturn(functionDetails); + ctx.when(() -> FunctionConfigUtils.convert(any(FunctionConfig.class), any(FunctionConfigUtils.ExtractedFunctionDetails.class))).thenReturn(functionDetails); + ctx.when(() -> FunctionConfigUtils.validateJavaFunction(any(), any())).thenCallRealMethod(); + ctx.when(() -> FunctionConfigUtils.doCommonChecks(any())).thenCallRealMethod(); + ctx.when(() -> FunctionConfigUtils.collectAllInputTopics(any())).thenCallRealMethod(); + ctx.when(() -> FunctionConfigUtils.doJavaChecks(any(), any())).thenCallRealMethod(); + }); + + // config has not changes and don't update auth, should fail + try { + resource.updateFunction( + funcConfig.getTenant(), + funcConfig.getNamespace(), + funcConfig.getName(), + null, + mockedFormData, + null, + funcConfig, + null, + null); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + try { + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(false); + resource.updateFunction( + funcConfig.getTenant(), + funcConfig.getNamespace(), + funcConfig.getName(), + null, + mockedFormData, + null, + funcConfig, + null, + updateOptions); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + // no changes but set the auth-update flag to true, should not fail + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(true); + resource.updateFunction( + funcConfig.getTenant(), + funcConfig.getNamespace(), + funcConfig.getName(), + null, + mockedFormData, + null, + funcConfig, + null, + updateOptions); + } + + private void registerDefaultFunction() { registerDefaultFunctionWithPackageUrl(null); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 9f786c5b73a..5dcc795304e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -18,21 +18,6 @@ */ package org.apache.pulsar.functions.worker.rest.api.v3; -import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE; -import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; @@ -60,6 +45,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.nar.NarClassLoader; @@ -97,6 +83,23 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE; +import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * Unit test of {@link SinksApiV3Resource}. @@ -965,29 +968,7 @@ public class SinkApiV3ResourceTest { String className, Integer parallelism, String expectedError) throws Exception { - mockStatic(ConnectorUtils.class, ctx -> { - ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class))) - .thenReturn(CASSANDRA_STRING_SINK); - }); - - mockStatic(ClassLoaderUtils.class, ctx -> { - }); - - mockStatic(FunctionCommon.class, ctx -> { - ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod(); - ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod(); - ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class); - ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class)); - ctx.when(() -> FunctionCommon - .convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE))) - .thenReturn(ATLEAST_ONCE); - }); - - this.mockedFunctionMetaData = - FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); - when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData); - - when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + mockFunctionCommon(tenant, namespace, sink); SinkConfig sinkConfig = new SinkConfig(); if (tenant != null) { @@ -1026,6 +1007,32 @@ public class SinkApiV3ResourceTest { } + private void mockFunctionCommon(String tenant, String namespace, String sink) throws IOException { + mockStatic(ConnectorUtils.class, ctx -> { + ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class))) + .thenReturn(CASSANDRA_STRING_SINK); + }); + + mockStatic(ClassLoaderUtils.class, ctx -> { + }); + + mockStatic(FunctionCommon.class, ctx -> { + ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod(); + ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod(); + ctx.when(() -> FunctionCommon.getSinkType(any())).thenReturn(String.class); + ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mock(NarClassLoader.class)); + ctx.when(() -> FunctionCommon + .convertProcessingGuarantee(eq(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE))) + .thenReturn(ATLEAST_ONCE); + }); + + this.mockedFunctionMetaData = + FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + } + private void updateDefaultSink() throws Exception { updateDefaultSinkWithPackageUrl(null); } @@ -1848,4 +1855,72 @@ public class SinkApiV3ResourceTest { } } } + + @Test + public void testUpdateSinkWithNoChange() throws IOException { + mockWorkerUtils(); + + // No change on config, + SinkConfig sinkConfig = createDefaultSinkConfig(); + + mockStatic(SinkConfigUtils.class, ctx -> { + ctx.when(() -> SinkConfigUtils.convertFromDetails(any())).thenReturn(sinkConfig); + ctx.when(() -> SinkConfigUtils.convert(any(), any())).thenCallRealMethod(); + ctx.when(() -> SinkConfigUtils.validateUpdate(any(), any())).thenCallRealMethod(); + ctx.when(() -> SinkConfigUtils.clone(any())).thenCallRealMethod(); + ctx.when(() -> SinkConfigUtils.collectAllInputTopics(any())).thenCallRealMethod(); + ctx.when(() -> SinkConfigUtils.validateAndExtractDetails(any(),any(),any(),anyBoolean())).thenCallRealMethod(); + }); + + mockFunctionCommon(sinkConfig.getTenant(), sinkConfig.getNamespace(), sinkConfig.getName()); + + // config has not changes and don't update auth, should fail + try { + resource.updateSink( + sinkConfig.getTenant(), + sinkConfig.getNamespace(), + sinkConfig.getName(), + null, + mockedFormData, + null, + sinkConfig, + null, + null); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + try { + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(false); + resource.updateSink( + sinkConfig.getTenant(), + sinkConfig.getNamespace(), + sinkConfig.getName(), + null, + mockedFormData, + null, + sinkConfig, + null, + updateOptions); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + // no changes but set the auth-update flag to true, should not fail + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(true); + resource.updateSink( + sinkConfig.getTenant(), + sinkConfig.getNamespace(), + sinkConfig.getName(), + null, + mockedFormData, + null, + sinkConfig, + null, + updateOptions); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 36be2de7166..eabf954cc77 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTe import static org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTest.getPulsarIOInvalidNar; import static org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3ResourceTest.getPulsarIOTwitterNar; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -33,6 +34,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.File; @@ -58,6 +60,7 @@ import org.apache.pulsar.client.admin.Packages; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.nar.NarClassLoader; @@ -846,6 +849,72 @@ public class SourceApiV3ResourceTest { null); } + @Test + public void testUpdateSourceWithNoChange() { + mockWorkerUtils(); + + // No change on config, + SourceConfig sourceConfig = createDefaultSourceConfig(); + mockStatic(SourceConfigUtils.class, ctx -> { + ctx.when(() -> SourceConfigUtils.convertFromDetails(any())).thenReturn(sourceConfig); + ctx.when(() -> SourceConfigUtils.convert(any(), any())).thenCallRealMethod(); + ctx.when(() -> SourceConfigUtils.validateUpdate(any(), any())).thenCallRealMethod(); + ctx.when(() -> SourceConfigUtils.clone(any())).thenCallRealMethod(); + ctx.when(() -> SourceConfigUtils.validateAndExtractDetails(any(),any(),anyBoolean())).thenCallRealMethod(); + }); + + mockFunctionCommon(sourceConfig.getTenant(), sourceConfig.getNamespace(), sourceConfig.getName()); + + // config has not changes and don't update auth, should fail + try { + resource.updateSource( + sourceConfig.getTenant(), + sourceConfig.getNamespace(), + sourceConfig.getName(), + null, + mockedFormData, + null, + sourceConfig, + null, + null); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + try { + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(false); + resource.updateSource( + sourceConfig.getTenant(), + sourceConfig.getNamespace(), + sourceConfig.getName(), + null, + mockedFormData, + null, + sourceConfig, + null, + updateOptions); + fail("Update without changes should fail"); + } catch (RestException e) { + assertTrue(e.getMessage().contains("Update contains no change")); + } + + // no changes but set the auth-update flag to true, should not fail + UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); + updateOptions.setUpdateAuthData(true); + resource.updateSource( + sourceConfig.getTenant(), + sourceConfig.getNamespace(), + sourceConfig.getName(), + null, + mockedFormData, + null, + sourceConfig, + null, + updateOptions); + } + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism must be a " + "positive number") public void testUpdateSourceZeroParallelism() throws Exception { @@ -881,26 +950,7 @@ public class SourceApiV3ResourceTest { Integer parallelism, String expectedError) throws Exception { - mockStatic(ConnectorUtils.class, c -> { - }); - mockStatic(ClassLoaderUtils.class, c -> { - }); - mockStatic(FunctionCommon.class, ctx -> { - ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod(); - ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod(); - ctx.when(() -> FunctionCommon.getSourceType(argThat(clazz -> clazz.getName().equals(TWITTER_FIRE_HOSE)))) - .thenReturn(String.class); - ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())) - .thenReturn(narClassLoader); - - - }); - - this.mockedFunctionMetaData = - FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); - when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); - - when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + mockFunctionCommon(tenant, namespace, function); SourceConfig sourceConfig = new SourceConfig(); if (tenant != null) { @@ -942,6 +992,27 @@ public class SourceApiV3ResourceTest { } + private void mockFunctionCommon(String tenant, String namespace, String function) { + mockStatic(ConnectorUtils.class, c -> { + }); + mockStatic(ClassLoaderUtils.class, c -> { + }); + mockStatic(FunctionCommon.class, ctx -> { + ctx.when(() -> FunctionCommon.createPkgTempFile()).thenCallRealMethod(); + ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod(); + ctx.when(() -> FunctionCommon.getSourceType(argThat(clazz -> clazz.getName().equals(TWITTER_FIRE_HOSE)))) + .thenReturn(String.class); + ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())) + .thenReturn(narClassLoader); + }); + + this.mockedFunctionMetaData = + FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); + } + private void updateDefaultSource() throws Exception { updateDefaultSourceWithPackageUrl(null); }
