This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6a2db62 Fix: Exception when switch cluster from auth enabled to auth
disabled (#4069)
6a2db62 is described below
commit 6a2db62f20853a4441d7d6c79a20524e6b8b6342
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Apr 17 23:11:38 2019 -0700
Fix: Exception when switch cluster from auth enabled to auth disabled
(#4069)
* fix bug in source and sink cli update
* fix import
* fix logic
* fix bug in auth when spec is not null but auth is turned off
* clean up debug
* cleanup
---
.../main/java/org/apache/pulsar/PulsarStandaloneStarter.java | 8 +++++++-
.../apache/pulsar/functions/runtime/KubernetesRuntime.java | 9 ++++++---
.../pulsar/functions/runtime/KubernetesRuntimeFactory.java | 10 +++++++---
.../org/apache/pulsar/functions/runtime/LocalRunner.java | 2 +-
.../pulsar/functions/runtime/ProcessRuntimeFactory.java | 7 +++++--
.../functions/runtime/KubernetesRuntimeFactoryTest.java | 2 +-
.../pulsar/functions/runtime/KubernetesRuntimeTest.java | 2 +-
.../apache/pulsar/functions/runtime/ProcessRuntimeTest.java | 2 +-
.../org/apache/pulsar/functions/worker/FunctionActioner.java | 12 +++++++-----
.../pulsar/functions/worker/FunctionRuntimeManager.java | 6 ++++--
10 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index c9aefa1..feef094 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -116,6 +116,12 @@ public class PulsarStandaloneStarter extends
PulsarStandalone {
public static void main(String args[]) throws Exception {
// Start standalone
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
- standalone.start();
+ try {
+ standalone.start();
+ } catch (Throwable th) {
+ log.error("Failed to start pulsar service.", th);
+ Runtime.getRuntime().exit(1);
+ }
+
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6dbb0d3..c3fe161 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -116,6 +116,7 @@ public class KubernetesRuntime implements Runtime {
)
);
private static final long GRPC_TIMEOUT_SECS = 5;
+ private final boolean authenticationEnabled;
// The thread that invokes the function
@Getter
@@ -160,7 +161,8 @@ public class KubernetesRuntime implements Runtime {
SecretsProviderConfigurator secretsProviderConfigurator,
Integer expectedMetricsCollectionInterval,
int percentMemoryPadding,
- KubernetesFunctionAuthProvider
functionAuthDataCacheProvider) throws Exception {
+ KubernetesFunctionAuthProvider
functionAuthDataCacheProvider,
+ boolean authenticationEnabled) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
@@ -174,6 +176,7 @@ public class KubernetesRuntime implements Runtime {
this.pulsarAdminUrl = pulsarAdminUrl;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.percentMemoryPadding = percentMemoryPadding;
+ this.authenticationEnabled = authenticationEnabled;
String logConfigFile = null;
String secretsProviderClassName =
secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
String secretsProviderConfig = null;
@@ -439,7 +442,7 @@ public class KubernetesRuntime implements Runtime {
private void submitStatefulSet() throws Exception {
final V1StatefulSet statefulSet = createStatefulSet();
// Configure function authentication if needed
- if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ if (authenticationEnabled &&
instanceConfig.getFunctionAuthenticationSpec() != null) {
functionAuthDataCacheProvider.configureAuthDataStatefulSet(
statefulSet,
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
@@ -755,7 +758,7 @@ public class KubernetesRuntime implements Runtime {
private List<String> getDownloadCommand(String bkPath, String
userCodeFilePath) {
// add auth plugin and parameters if necessary
- if (authConfig != null) {
+ if (authenticationEnabled && authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&&
isNotBlank(authConfig.getClientAuthenticationParameters())) {
return Arrays.asList(
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 7fa67ce..3aed50e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -92,6 +92,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
private AppsV1Api appsClient;
private CoreV1Api coreClient;
private Resources functionInstanceMinResources;
+ private final boolean authenticationEnabled;
@VisibleForTesting
public KubernetesRuntimeFactory(String k8Uri,
@@ -114,7 +115,8 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
String changeConfigMap,
String changeConfigMapNamespace,
Resources functionInstanceMinResources,
- SecretsProviderConfigurator
secretsProviderConfigurator) {
+ SecretsProviderConfigurator
secretsProviderConfigurator,
+ boolean authenticationEnabled) {
this.kubernetesInfo = new KubernetesInfo();
this.kubernetesInfo.setK8Uri(k8Uri);
if (!isEmpty(jobNamespace)) {
@@ -165,6 +167,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
this.expectedMetricsCollectionInterval =
expectedMetricsCollectionInterval == null ? -1 :
expectedMetricsCollectionInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.functionInstanceMinResources = functionInstanceMinResources;
+ this.authenticationEnabled = authenticationEnabled;
try {
setupClient();
} catch (Exception e) {
@@ -195,7 +198,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
}
// adjust the auth config to support auth
- if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ if (authenticationEnabled &&
instanceConfig.getFunctionAuthenticationSpec() != null) {
getAuthProvider().configureAuthenticationConfig(authConfig,
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
@@ -223,7 +226,8 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
secretsProviderConfigurator,
expectedMetricsCollectionInterval,
this.kubernetesInfo.getPercentMemoryPadding(),
- getAuthProvider());
+ getAuthProvider(),
+ authenticationEnabled);
}
@Override
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 966f299..37efbee 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -179,7 +179,7 @@ public class LocalRunner {
null, /* python instance file */
null, /* log directory */
null, /* extra dependencies dir */
- new DefaultSecretsProviderConfigurator())) {
+ new DefaultSecretsProviderConfigurator(), false)) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7090967..6976df5 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -39,6 +39,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
private final String pulsarServiceUrl;
private final String stateStorageServiceUrl;
+ private final boolean authenticationEnabled;
private AuthenticationConfig authConfig;
private SecretsProviderConfigurator secretsProviderConfigurator;
private String javaInstanceJarFile;
@@ -54,7 +55,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
String pythonInstanceFile,
String logDirectory,
String extraDependenciesDir,
- SecretsProviderConfigurator
secretsProviderConfigurator) {
+ SecretsProviderConfigurator
secretsProviderConfigurator,
+ boolean authenticationEnabled) {
this.pulsarServiceUrl = pulsarServiceUrl;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.authConfig = authConfig;
@@ -63,6 +65,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
this.pythonInstanceFile = pythonInstanceFile;
this.extraDependenciesDir = extraDependenciesDir;
this.logDirectory = logDirectory;
+ this.authenticationEnabled = authenticationEnabled;
// if things are not specified, try to figure out by env properties
if (this.javaInstanceJarFile == null) {
@@ -129,7 +132,7 @@ public class ProcessRuntimeFactory implements
RuntimeFactory {
}
// configure auth if necessary
- if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ if (authenticationEnabled &&
instanceConfig.getFunctionAuthenticationSpec() != null) {
getAuthProvider().configureAuthenticationConfig(authConfig,
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
index 8f1172e..5ea75fa 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
@@ -145,7 +145,7 @@ public class KubernetesRuntimeFactoryTest {
null,
null,
minResources,
- new TestSecretProviderConfigurator()));
+ new TestSecretProviderConfigurator(), false));
doNothing().when(factory).setupClient();
return factory;
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index b1c3dfb..5128ff3 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -163,7 +163,7 @@ public class KubernetesRuntimeTest {
null,
null,
null,
- null, new TestSecretProviderConfigurator()));
+ null, new TestSecretProviderConfigurator(), false));
doNothing().when(factory).setupClient();
return factory;
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index e2c0499..842b0a3 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -139,7 +139,7 @@ public class ProcessRuntimeTest {
pythonInstanceFile,
logDirectory,
extraDependenciesDir, /* extra dependencies dir */
- new TestSecretsProviderConfigurator());
+ new TestSecretsProviderConfigurator(), false);
}
FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 175d6e5..7140fdc 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -154,11 +154,13 @@ public class FunctionActioner {
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
- // check to make sure functionAuthenticationSpec has any data. If not
set to null, since for protobuf,
+ // check to make sure functionAuthenticationSpec has any data and
authentication is enabled.
+ // If not set to null, since for protobuf,
// even if the field is not set its not going to be null. Have to use
the "has" method to check
- Function.FunctionAuthenticationSpec functionAuthenticationSpec
- = instance.getFunctionMetaData().hasFunctionAuthSpec()
- ? instance.getFunctionMetaData().getFunctionAuthSpec() : null;
+ Function.FunctionAuthenticationSpec functionAuthenticationSpec = null;
+ if (workerConfig.isAuthenticationEnabled() &&
instance.getFunctionMetaData().hasFunctionAuthSpec()) {
+ functionAuthenticationSpec =
instance.getFunctionMetaData().getFunctionAuthSpec();
+ }
InstanceConfig instanceConfig =
createInstanceConfig(functionDetailsBuilder.build(),
functionAuthenticationSpec,
@@ -283,7 +285,7 @@ public class FunctionActioner {
functionRuntimeInfo.getRuntimeSpawner().close();
// cleanup any auth data cached
- if
(functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()
!= null) {
+ if (workerConfig.isAuthenticationEnabled() &&
functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()
!= null) {
try {
log.info("{}-{} Cleaning up authentication data for
function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
functionRuntimeInfo.getRuntimeSpawner()
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index d55898f..534710d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -150,7 +150,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
workerConfig.getProcessContainerFactory().getLogDirectory(),
workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(),
- secretsProviderConfigurator);
+ secretsProviderConfigurator,
+ workerConfig.isAuthenticationEnabled());
} else if (workerConfig.getKubernetesContainerFactory() != null){
this.runtimeFactory = new KubernetesRuntimeFactory(
workerConfig.getKubernetesContainerFactory().getK8Uri(),
@@ -173,7 +174,8 @@ public class FunctionRuntimeManager implements
AutoCloseable{
workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
workerConfig.getFunctionInstanceMinResources(),
- secretsProviderConfigurator);
+ secretsProviderConfigurator,
+ workerConfig.isAuthenticationEnabled());
} else {
throw new RuntimeException("Either Thread, Process or Kubernetes
Container Factory need to be set");
}