This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba8acd  Add documentation and flexible aws-credential plugin to 
support aws-role (#1972)
8ba8acd is described below

commit 8ba8acdbe812725df6739c5d19db3e47a553f351
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Jun 18 12:15:28 2018 -0700

    Add documentation and flexible aws-credential plugin to support aws-role 
(#1972)
    
    * Add documentation and flexible aws-credential plugin to support aws-role
    
    * add protobuf relocation
---
 pulsar-io/kinesis/pom.xml                          |  11 +-
 .../io/kinesis/AwsCredentialProviderPlugin.java    |  36 ++---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 104 ++++++++------
 .../apache/pulsar/io/kinesis/KinesisSinkTest.java  | 149 +++++++++++++++++++++
 4 files changed, 227 insertions(+), 73 deletions(-)

diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 7f9f488..0dee750 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -47,13 +47,16 @@
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
-      <version>${jackson.version}</version>
     </dependency>
 
     <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
-      <version>${jackson.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
     </dependency>
 
     <!-- kinesis dependencies -->
@@ -110,10 +113,6 @@
                   <pattern>com.google.protobuf</pattern>
                   
<shadedPattern>org.apache.pulsar.replicator.com.google.protobuf</shadedPattern>
                 </relocation>
-                <relocation>
-                  <pattern>com.amazonaws</pattern>
-                  
<shadedPattern>org.apache.pulsar.com.amazonaws</shadedPattern>
-                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
index 8c616cc..7e463bb 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
@@ -19,42 +19,32 @@
 
 package org.apache.pulsar.io.kinesis;
 
+import java.io.Closeable;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+
 /**
  * Kinesis source/sink calls credential-provider while refreshing aws 
accessKey and secreKey. So, implementation
  * AwsCredentialProviderPlugin needs to makes sure to return non-expired keys 
when it requires.
  *
  */
-public interface AwsCredentialProviderPlugin {
-    
+public interface AwsCredentialProviderPlugin extends Closeable {
+
     /**
      * accepts aws-account related param and initialize credential provider.
-     *  
+     * 
      * @param param
      */
     void init(String param);
-    
-    /**
-     * Returns the AWS access key ID for this credentials object.
-     * 
-     * @return The AWS access key ID for this credentials object.
-     */
-    String getAWSAccessKeyId();
 
     /**
-     * Returns the AWS secret access key for this credentials object.
+     * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials} 
in case credential belongs to IAM user or
+     * it can return {@link BasicSessionCredentials} if user wants to generate 
temporary credential for a given IAM
+     * role.
      * 
-     * @return The AWS secret access key for this credentials object.
-     */
-    String getAWSSecretKey();
-    
-    /**
-     * Forces this credentials provider to refresh its credentials. For many
-     * implementations of credentials provider, this method may simply be a
-     * no-op, such as any credentials provider implementation that vends
-     * static/non-changing credentials. For other implementations that vend
-     * different credentials through out their lifetime, this method should
-     * force the credentials provider to refresh its credentials.
+     * @return
      */
-    void refresh();
+    AWSCredentialsProvider getCredentialProvider();
 
 }
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 39af1f3..5a43312 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -30,6 +30,9 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
@@ -37,20 +40,35 @@ import org.slf4j.LoggerFactory;
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
 /**
- * A Kinesis sink
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}. 
+ * <pre> 
+ * {@link KinesisSinkConfig} accepts
+ * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at : 
https://docs.aws.amazon.com/general/latest/gr/rande.html
+ * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
+ * 3. <b>awsKinesisStreamName:</b> kinesis stream name
+ * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of 
implementation of {@link AwsCredentialProviderPlugin}.
+ *    - It is a factory class which creates an {@link AWSCredentialsProvider} 
that will be used by {@link KinesisProducer}
+ *    - If it is empty then {@link KinesisSink} creates default {@link 
AWSCredentialsProvider} 
+ *      which accepts json-map of credentials in awsCredentialPluginParam 
+ *      eg: awsCredentialPluginParam = 
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
+ * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link 
AwsCredentialProviderPlugin}
+ * 
+ * </pre>
+ * 
  */
 public class KinesisSink implements Sink<byte[]> {
 
@@ -59,17 +77,27 @@ public class KinesisSink implements Sink<byte[]> {
     private KinesisProducer kinesisProducer;
     private KinesisSinkConfig kinesisSinkConfig;
     private String streamName;
+    private static final String defaultPartitionedKey = "default";
+    private static final int maxPartitionedKeyLength = 256;
 
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
 
     @Override
     public void write(RecordContext inputRecordContext, byte[] value) throws 
Exception {
-        final String partitionedKey = inputRecordContext.getPartitionId();
+        String partitionedKey = 
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
+                ? inputRecordContext.getPartitionId()
+                : defaultPartitionedKey;
+        partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
+                ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
+                : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey, ByteBuffer.wrap(value));
         addCallback(addRecordResult,
                 ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, value.length);
+        }
     }
 
     @Override
@@ -107,10 +135,10 @@ public class KinesisSink implements Sink<byte[]> {
         LOG.info("Kinesis sink started. {}", 
(ReflectionToStringBuilder.toString(kinesisConfig, 
ToStringStyle.SHORT_PREFIX_STYLE)));
     }
 
-    private AWSCredentialsProvider createCredentialProvider(String 
awsCredentialPluginName,
+    protected AWSCredentialsProvider createCredentialProvider(String 
awsCredentialPluginName,
             String awsCredentialPluginParam) {
         if (isNotBlank(awsCredentialPluginName)) {
-            return createCredentialProvider(awsCredentialPluginName, 
awsCredentialPluginParam);
+            return createCredentialProviderWithPlugin(awsCredentialPluginName, 
awsCredentialPluginParam);
         } else {
             return defaultCredentialProvider(awsCredentialPluginParam);        
    
         }
@@ -168,33 +196,23 @@ public class KinesisSink implements Sink<byte[]> {
         }
     }
 
-    public static AWSCredentialsProvider createCredentialProviderPlugin(String 
pluginFQClassName, String param)
+    /**
+     * Creates a instance of credential provider which can return {@link 
AWSCredentials} or {@link BasicAWSCredentials}
+     * based on IAM user/roles.
+     * 
+     * @param pluginFQClassName
+     * @param param
+     * @return
+     * @throws IllegalArgumentException
+     */
+    public static AWSCredentialsProvider 
createCredentialProviderWithPlugin(String pluginFQClassName, String param)
             throws IllegalArgumentException {
         try {
             Class<?> clazz = Class.forName(pluginFQClassName);
             Constructor<?> ctor = clazz.getConstructor();
             final AwsCredentialProviderPlugin plugin = 
(AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {});
             plugin.init(param);
-            return new AWSCredentialsProvider() {
-                @Override
-                public AWSCredentials getCredentials() {
-                    return new AWSCredentials() {
-                        @Override
-                        public String getAWSAccessKeyId() {
-                            return plugin.getAWSAccessKeyId();
-                        }
-
-                        @Override
-                        public String getAWSSecretKey() {
-                            return plugin.getAWSSecretKey();
-                        }
-                    };
-                }
-                @Override
-                public void refresh() {
-                    plugin.refresh();
-                }
-            };
+            return plugin.getCredentialProvider();
         } catch (Exception e) {
             LOG.error("Failed to initialize AwsCredentialProviderPlugin {}", 
pluginFQClassName, e);
             throw new IllegalArgumentException(
@@ -202,24 +220,22 @@ public class KinesisSink implements Sink<byte[]> {
         }
     }
 
-    private AWSCredentialsProvider defaultCredentialProvider(String 
awsCredentialPluginParam) {
-        String[] credentials = awsCredentialPluginParam.split(",");
-        String accessKey = null;
-        String secretKey = null;
-        if (credentials.length == 2) {
-            for (String credential : credentials) {
-                String[] keys = credential.split("=");
-                if (keys.length == 2) {
-                    if (keys[0].equals(ACCESS_KEY_NAME)) {
-                        accessKey = keys[1];
-                    } else if (keys[0].equals(SECRET_KEY_NAME)) {
-                        secretKey = keys[1];
-                    }
-                }
-            }
-        }
+    /**
+     * It creates a default credential provider which takes accessKey and 
secretKey form configuration and creates
+     * {@link AWSCredentials}
+     * 
+     * @param awsCredentialPluginParam
+     * @return
+     */
+    protected AWSCredentialsProvider defaultCredentialProvider(String 
awsCredentialPluginParam) {
+        Map<String, String> credentialMap = new 
Gson().fromJson(awsCredentialPluginParam,
+                new TypeToken<Map<String, String>>() {
+                }.getType());
+        String accessKey = credentialMap.get(ACCESS_KEY_NAME);
+        String secretKey = credentialMap.get(SECRET_KEY_NAME);
         checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey),
-                String.format("access-key/secret-key not present in param: 
format: %s=<access-key>,%s=<secret-key>",
+                String.format(
+                        "Default %s and %s must be present into json-map if 
AwsCredentialProviderPlugin not provided",
                         ACCESS_KEY_NAME, SECRET_KEY_NAME));
         return defaultCredentialProvider(accessKey, secretKey);
     }
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
new file mode 100644
index 0000000..2bd4a17
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.google.gson.Gson;
+
+/**
+ * Unit test of {@link KinesisSink}.
+ */
+public class KinesisSinkTest {
+
+    @Test
+    public void testDefaultCredentialProvider() throws Exception {
+        KinesisSink sink = new KinesisSink();
+        Map<String, String> credentialParam = Maps.newHashMap();
+        String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+        try {
+            sink.defaultCredentialProvider(awsCredentialPluginParam);
+            Assert.fail("accessKey and SecretKey validation not applied");
+        } catch (IllegalArgumentException ie) {
+            // Ok..
+        }
+
+        final String accesKey = "ak";
+        final String secretKey = "sk";
+        credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+        credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+        awsCredentialPluginParam = new Gson().toJson(credentialParam);
+        AWSCredentialsProvider credentialProvider = 
sink.defaultCredentialProvider(awsCredentialPluginParam);
+        Assert.assertNotNull(credentialProvider);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), 
accesKey);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), 
secretKey);
+
+        sink.close();
+    }
+
+    @Test
+    public void testCredentialProvider() throws Exception {
+        KinesisSink sink = new KinesisSink();
+
+        final String accesKey = "ak";
+        final String secretKey = "sk";
+        Map<String, String> credentialParam = Maps.newHashMap();
+        credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+        credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+        String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+        AWSCredentialsProvider credentialProvider = 
sink.createCredentialProvider(null, awsCredentialPluginParam);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), 
accesKey);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), 
secretKey);
+
+        credentialProvider = 
sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(), 
"{}");
+        Assert.assertNotNull(credentialProvider);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+                AwsCredentialProviderPluginImpl.accessKey);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+                AwsCredentialProviderPluginImpl.secretKey);
+        Assert.assertEquals(((BasicSessionCredentials) 
credentialProvider.getCredentials()).getSessionToken(),
+                AwsCredentialProviderPluginImpl.sessionToken);
+
+        sink.close();
+    }
+
+    @Test
+    public void testCredentialProviderPlugin() throws Exception {
+        KinesisSink sink = new KinesisSink();
+
+        AWSCredentialsProvider credentialProvider = sink
+                
.createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(),
 "{}");
+        Assert.assertNotNull(credentialProvider);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+                AwsCredentialProviderPluginImpl.accessKey);
+        
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+                AwsCredentialProviderPluginImpl.secretKey);
+        Assert.assertEquals(((BasicSessionCredentials) 
credentialProvider.getCredentials()).getSessionToken(),
+                AwsCredentialProviderPluginImpl.sessionToken);
+
+        sink.close();
+    }
+
+    public static class AwsCredentialProviderPluginImpl implements 
AwsCredentialProviderPlugin {
+
+        public final static String accessKey = "ak";
+        public final static String secretKey = "sk";
+        public final static String sessionToken = "st";
+
+        public void init(String param) {
+            // no-op
+        }
+
+        @Override
+        public AWSCredentialsProvider getCredentialProvider() {
+            return new AWSCredentialsProvider() {
+                @Override
+                public AWSCredentials getCredentials() {
+                    return new BasicSessionCredentials(accessKey, secretKey, 
sessionToken) {
+
+                        @Override
+                        public String getAWSAccessKeyId() {
+                            return accessKey;
+                        }
+                        @Override
+                        public String getAWSSecretKey() {
+                            return secretKey;
+                        }
+                        @Override
+                        public String getSessionToken() {
+                            return sessionToken;
+                        }
+                    };
+                }
+                @Override
+                public void refresh() {
+                    // TODO Auto-generated method stub
+                }
+            };
+        }
+        @Override
+        public void close() throws IOException {
+            // TODO Auto-generated method stub
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to