This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5c418 [pulsar-storm] Fix: Authentication is failing with storm
adapter (#6782)
1d5c418 is described below
commit 1d5c418ccf3f2dc0844e735e5ff4bf0dd2f5738f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon May 18 21:53:36 2020 -0700
[pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)
### Motivation
In #4284, made
[Authentication](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java#L33)
a transient under ClientConfigurationData. `Authentication` is a serializable
class because ClientConfig is used in Storm and Spark adapter to pass client
configuration and Storm serializes and deserializes spout and bolt while
executing them in topology. Now, after making `Authentication` transient
variable storm always des [...]
### Modification
Keep Authentication param serializable.
### Result
It fixes pulsar-storm with authentication enabled.
---
.../java/org/apache/pulsar/client/impl/ClientBuilderImpl.java | 6 ++++++
.../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 +++++++--
.../apache/pulsar/client/impl/auth/AuthenticationDisabled.java | 1 +
.../org/apache/pulsar/client/impl/auth/AuthenticationToken.java | 3 ++-
.../apache/pulsar/client/impl/conf/ClientConfigurationData.java | 4 +++-
.../streaming/connectors/pulsar/CachedPulsarClientTest.java | 4 ++--
6 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3283166..8d3fdbe 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -109,6 +109,9 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder authentication(String authPluginClassName, String
authParamsString)
throws UnsupportedAuthenticationException {
+ conf.setAuthPluginClassName(authPluginClassName);
+ conf.setAuthParams(authParamsString);
+ conf.setAuthParamMap(null);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName,
authParamsString));
return this;
}
@@ -116,6 +119,9 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder authentication(String authPluginClassName,
Map<String, String> authParams)
throws UnsupportedAuthenticationException {
+ conf.setAuthPluginClassName(authPluginClassName);
+ conf.setAuthParamMap(authParams);
+ conf.setAuthParams(null);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName,
authParams));
return this;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 91ca682..bcb0f51 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -151,11 +151,16 @@ public class PulsarClientImpl implements PulsarClient {
}
private void setAuth(ClientConfigurationData conf) throws
PulsarClientException {
- if (StringUtils.isBlank(conf.getAuthPluginClassName()) ||
StringUtils.isBlank( conf.getAuthParams())) {
+ if (StringUtils.isBlank(conf.getAuthPluginClassName())
+ || (StringUtils.isBlank(conf.getAuthParams()) &&
conf.getAuthParamMap() == null)) {
return;
}
-
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
conf.getAuthParams()));
+ if (StringUtils.isNotBlank(conf.getAuthParams())) {
+
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
conf.getAuthParams()));
+ } else if (conf.getAuthParamMap() != null) {
+
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
conf.getAuthParamMap()));
+ }
}
public ClientConfigurationData getConfiguration() {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
index c74a38e..5ee7bcb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
public class AuthenticationDisabled implements Authentication,
EncodedAuthenticationParameterSupport {
protected final AuthenticationDataProvider nullData = new
AuthenticationDataNull();
+ public static final AuthenticationDisabled INSTANCE = new
AuthenticationDisabled();
/**
*
*/
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
index bd13401..4ae3d3b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -38,7 +38,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
*/
public class AuthenticationToken implements Authentication,
EncodedAuthenticationParameterSupport {
- private Supplier<String> tokenSupplier;
+ private static final long serialVersionUID = 1L;
+ private transient Supplier<String> tokenSupplier;
public AuthenticationToken() {
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 0cf496a..980630c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.io.Serializable;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -46,9 +47,10 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
private transient ServiceUrlProvider serviceUrlProvider;
@JsonIgnore
- private transient Authentication authentication = new
AuthenticationDisabled();
+ private Authentication authentication = AuthenticationDisabled.INSTANCE;
private String authPluginClassName;
private String authParams;
+ private Map<String, String> authParamMap;
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
index 39cdca1..0c81c4a 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
@@ -37,7 +37,7 @@ public class CachedPulsarClientTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
- @BeforeTest
+ @BeforeMethod
public void clearCache() {
CachedPulsarClient.clear();
}