This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5c418  [pulsar-storm] Fix: Authentication is failing with storm 
adapter (#6782)
1d5c418 is described below

commit 1d5c418ccf3f2dc0844e735e5ff4bf0dd2f5738f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon May 18 21:53:36 2020 -0700

    [pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)
    
    ### Motivation
    In #4284, made 
[Authentication](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java#L33)
 a transient under ClientConfigurationData. `Authentication` is a serializable 
class because ClientConfig is used in Storm and Spark adapter to pass client 
configuration and Storm serializes and deserializes spout and bolt while 
executing them in topology. Now, after making `Authentication` transient 
variable storm always des [...]
    
    ### Modification
    Keep Authentication param serializable.
    
    ### Result
    It fixes pulsar-storm with authentication enabled.
---
 .../java/org/apache/pulsar/client/impl/ClientBuilderImpl.java    | 6 ++++++
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java     | 9 +++++++--
 .../apache/pulsar/client/impl/auth/AuthenticationDisabled.java   | 1 +
 .../org/apache/pulsar/client/impl/auth/AuthenticationToken.java  | 3 ++-
 .../apache/pulsar/client/impl/conf/ClientConfigurationData.java  | 4 +++-
 .../streaming/connectors/pulsar/CachedPulsarClientTest.java      | 4 ++--
 6 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 3283166..8d3fdbe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -109,6 +109,9 @@ public class ClientBuilderImpl implements ClientBuilder {
     @Override
     public ClientBuilder authentication(String authPluginClassName, String 
authParamsString)
             throws UnsupportedAuthenticationException {
+        conf.setAuthPluginClassName(authPluginClassName);
+        conf.setAuthParams(authParamsString);
+        conf.setAuthParamMap(null);
         
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, 
authParamsString));
         return this;
     }
@@ -116,6 +119,9 @@ public class ClientBuilderImpl implements ClientBuilder {
     @Override
     public ClientBuilder authentication(String authPluginClassName, 
Map<String, String> authParams)
             throws UnsupportedAuthenticationException {
+        conf.setAuthPluginClassName(authPluginClassName);
+        conf.setAuthParamMap(authParams);
+        conf.setAuthParams(null);
         
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, 
authParams));
         return this;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 91ca682..bcb0f51 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -151,11 +151,16 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private void setAuth(ClientConfigurationData conf) throws 
PulsarClientException {
-        if (StringUtils.isBlank(conf.getAuthPluginClassName()) || 
StringUtils.isBlank( conf.getAuthParams())) {
+        if (StringUtils.isBlank(conf.getAuthPluginClassName())
+                || (StringUtils.isBlank(conf.getAuthParams()) && 
conf.getAuthParamMap() == null)) {
             return;
         }
 
-        
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
 conf.getAuthParams()));
+        if (StringUtils.isNotBlank(conf.getAuthParams())) {
+            
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
 conf.getAuthParams()));
+        } else if (conf.getAuthParamMap() != null) {
+            
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(),
 conf.getAuthParamMap()));
+        }
     }
 
     public ClientConfigurationData getConfiguration() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
index c74a38e..5ee7bcb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDisabled.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 public class AuthenticationDisabled implements Authentication, 
EncodedAuthenticationParameterSupport {
 
     protected final AuthenticationDataProvider nullData = new 
AuthenticationDataNull();
+    public static final AuthenticationDisabled INSTANCE = new 
AuthenticationDisabled();
     /**
      *
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
index bd13401..4ae3d3b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -38,7 +38,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
  */
 public class AuthenticationToken implements Authentication, 
EncodedAuthenticationParameterSupport {
 
-    private Supplier<String> tokenSupplier;
+    private static final long serialVersionUID = 1L;
+    private transient Supplier<String> tokenSupplier;
 
     public AuthenticationToken() {
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 0cf496a..980630c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -46,9 +47,10 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     private transient ServiceUrlProvider serviceUrlProvider;
 
     @JsonIgnore
-    private transient Authentication authentication = new 
AuthenticationDisabled();
+    private Authentication authentication = AuthenticationDisabled.INSTANCE;
     private String authPluginClassName;
     private String authParams;
+    private Map<String, String> authParamMap;
 
     private long operationTimeoutMs = 30000;
     private long statsIntervalSeconds = 60;
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
index 39cdca1..0c81c4a 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -37,7 +37,7 @@ public class CachedPulsarClientTest {
 
     private static final String SERVICE_URL = "pulsar://localhost:6650";
 
-    @BeforeTest
+    @BeforeMethod
     public void clearCache() {
         CachedPulsarClient.clear();
     }

Reply via email to