This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f692af  Make authentication parameters configurable on the 
pulsar-client-kafka (#2373)
6f692af is described below

commit 6f692af11d9215256df37619cd7f6f17c6138baa
Author: ssunorz <[email protected]>
AuthorDate: Tue Aug 21 02:40:39 2018 +0900

    Make authentication parameters configurable on the pulsar-client-kafka 
(#2373)
    
    ### Motivation
    
    Make authentication parameters configurable on the pulsar-client-kafka.
    
    ### Modifications
    
    Add authentication parameters to the PulsarClientKafkaConfig.
    
    ### Result
    
    Users can set authentication parameters on the pulsar-client-kafka.
---
 .../client/kafka/compat/PulsarClientKafkaConfig.java  | 19 +++++++++++++++----
 site2/docs/adaptors-kafka.md                          |  4 +++-
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index ca57f5b..c38d93d 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.kafka.compat;
 
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -29,6 +30,8 @@ public class PulsarClientKafkaConfig {
 
     /// Config variables
     public static final String AUTHENTICATION_CLASS = 
"pulsar.authentication.class";
+    public static final String AUTHENTICATION_PARAMS_MAP = 
"pulsar.authentication.params.map";
+    public static final String AUTHENTICATION_PARAMS_STRING = 
"pulsar.authentication.params.string";
     public static final String USE_TLS = "pulsar.use.tls";
     public static final String TLS_TRUST_CERTS_FILE_PATH = 
"pulsar.tls.trust.certs.file.path";
     public static final String TLS_ALLOW_INSECURE_CONNECTION = 
"pulsar.tls.allow.insecure.connection";
@@ -51,10 +54,18 @@ public class PulsarClientKafkaConfig {
         if (properties.containsKey(AUTHENTICATION_CLASS)) {
             String className = properties.getProperty(AUTHENTICATION_CLASS);
             try {
-                @SuppressWarnings("unchecked")
-                Class<Authentication> clazz = (Class<Authentication>) 
Class.forName(className);
-                Authentication auth = clazz.newInstance();
-                clientBuilder.authentication(auth);
+                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+                    String authParamsString = (String) 
properties.get(AUTHENTICATION_PARAMS_STRING);
+                    clientBuilder.authentication(className, authParamsString);
+                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+                    Map<String, String> authParams = (Map<String, String>) 
properties.get(AUTHENTICATION_PARAMS_MAP);
+                    clientBuilder.authentication(className, authParams);
+                } else {
+                    @SuppressWarnings("unchecked")
+                    Class<Authentication> clazz = (Class<Authentication>) 
Class.forName(className);
+                    Authentication auth = clazz.newInstance();
+                    clientBuilder.authentication(auth);
+                }
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 99f547f..809eae3 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -224,7 +224,9 @@ You can configure Pulsar authentication provider directly 
from the Kafka propert
 
 | Config property                        | Default | Notes                     
                                                             |
 
|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
-| `pulsar.authentication.class`          |         | Configure to auth 
provider. Eg. `org.apache.pulsar.client.impl.auth.AuthenticationTls` |
+| 
[`pulsar.authentication.class`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-org.apache.pulsar.client.api.Authentication-)
          |         | Configure to auth provider. Eg. 
`org.apache.pulsar.client.impl.auth.AuthenticationTls` |
+| 
[`pulsar.authentication.params.map`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.util.Map-)
          |         | Map which represents parameters for the 
Authentication-Plugin |
+| 
[`pulsar.authentication.params.string`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.lang.String-)
          |         | String which represents parameters for the 
Authentication-Plugin, Eg. `key1:val1,key2:val2` |
 | 
[`pulsar.use.tls`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-)
                       | `false` | Enable TLS transport encryption              
                                          |
 | 
[`pulsar.tls.trust.certs.file.path`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-)
   |         | Path for the TLS trust certificate store                         
                      |
 | 
[`pulsar.tls.allow.insecure.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-)
 | `false` | Accept self-signed certificates from brokers                       
                    |

Reply via email to