This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba8acd Add documentation and flexible aws-credential plugin to
support aws-role (#1972)
8ba8acd is described below
commit 8ba8acdbe812725df6739c5d19db3e47a553f351
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jun 18 12:15:28 2018 -0700
Add documentation and flexible aws-credential plugin to support aws-role
(#1972)
* Add documentation and flexible aws-credential plugin to support aws-role
* add protobuf relocation
---
pulsar-io/kinesis/pom.xml | 11 +-
.../io/kinesis/AwsCredentialProviderPlugin.java | 36 ++---
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 104 ++++++++------
.../apache/pulsar/io/kinesis/KinesisSinkTest.java | 149 +++++++++++++++++++++
4 files changed, 227 insertions(+), 73 deletions(-)
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 7f9f488..0dee750 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -47,13 +47,16 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
- <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
</dependency>
<!-- kinesis dependencies -->
@@ -110,10 +113,6 @@
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.pulsar.replicator.com.google.protobuf</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.amazonaws</pattern>
-
<shadedPattern>org.apache.pulsar.com.amazonaws</shadedPattern>
- </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
index 8c616cc..7e463bb 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
@@ -19,42 +19,32 @@
package org.apache.pulsar.io.kinesis;
+import java.io.Closeable;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+
/**
* Kinesis source/sink calls credential-provider while refreshing aws
accessKey and secreKey. So, implementation
* AwsCredentialProviderPlugin needs to makes sure to return non-expired keys
when it requires.
*
*/
-public interface AwsCredentialProviderPlugin {
-
+public interface AwsCredentialProviderPlugin extends Closeable {
+
/**
* accepts aws-account related param and initialize credential provider.
- *
+ *
* @param param
*/
void init(String param);
-
- /**
- * Returns the AWS access key ID for this credentials object.
- *
- * @return The AWS access key ID for this credentials object.
- */
- String getAWSAccessKeyId();
/**
- * Returns the AWS secret access key for this credentials object.
+ * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials}
in case credential belongs to IAM user or
+ * it can return {@link BasicSessionCredentials} if user wants to generate
temporary credential for a given IAM
+ * role.
*
- * @return The AWS secret access key for this credentials object.
- */
- String getAWSSecretKey();
-
- /**
- * Forces this credentials provider to refresh its credentials. For many
- * implementations of credentials provider, this method may simply be a
- * no-op, such as any credentials provider implementation that vends
- * static/non-changing credentials. For other implementations that vend
- * different credentials through out their lifetime, this method should
- * force the credentials provider to refresh its credentials.
+ * @return
*/
- void refresh();
+ AWSCredentialsProvider getCredentialProvider();
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 39af1f3..5a43312 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -30,6 +30,9 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
@@ -37,20 +40,35 @@ import org.slf4j.LoggerFactory;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
/**
- * A Kinesis sink
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
+ * {@link KinesisSinkConfig} accepts
+ * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at :
https://docs.aws.amazon.com/general/latest/gr/rande.html
+ * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
+ * 3. <b>awsKinesisStreamName:</b> kinesis stream name
+ * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of
implementation of {@link AwsCredentialProviderPlugin}.
+ * - It is a factory class which creates an {@link AWSCredentialsProvider}
that will be used by {@link KinesisProducer}
+ * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
+ * which accepts json-map of credentials in awsCredentialPluginParam
+ * eg: awsCredentialPluginParam =
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
+ * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link
AwsCredentialProviderPlugin}
+ *
+ * </pre>
+ *
*/
public class KinesisSink implements Sink<byte[]> {
@@ -59,17 +77,27 @@ public class KinesisSink implements Sink<byte[]> {
private KinesisProducer kinesisProducer;
private KinesisSinkConfig kinesisSinkConfig;
private String streamName;
+ private static final String defaultPartitionedKey = "default";
+ private static final int maxPartitionedKeyLength = 256;
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";
@Override
public void write(RecordContext inputRecordContext, byte[] value) throws
Exception {
- final String partitionedKey = inputRecordContext.getPartitionId();
+ String partitionedKey =
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
+ ? inputRecordContext.getPartitionId()
+ : defaultPartitionedKey;
+ partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
+ ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
+ : partitionedKey; // partitionedKey Length must be at least
one, and at most 256
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
partitionedKey, ByteBuffer.wrap(value));
addCallback(addRecordResult,
ProducerSendCallback.create(this.streamName,
inputRecordContext, System.nanoTime()), directExecutor());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Published message to kinesis stream {} with size {}",
streamName, value.length);
+ }
}
@Override
@@ -107,10 +135,10 @@ public class KinesisSink implements Sink<byte[]> {
LOG.info("Kinesis sink started. {}",
(ReflectionToStringBuilder.toString(kinesisConfig,
ToStringStyle.SHORT_PREFIX_STYLE)));
}
- private AWSCredentialsProvider createCredentialProvider(String
awsCredentialPluginName,
+ protected AWSCredentialsProvider createCredentialProvider(String
awsCredentialPluginName,
String awsCredentialPluginParam) {
if (isNotBlank(awsCredentialPluginName)) {
- return createCredentialProvider(awsCredentialPluginName,
awsCredentialPluginParam);
+ return createCredentialProviderWithPlugin(awsCredentialPluginName,
awsCredentialPluginParam);
} else {
return defaultCredentialProvider(awsCredentialPluginParam);
}
@@ -168,33 +196,23 @@ public class KinesisSink implements Sink<byte[]> {
}
}
- public static AWSCredentialsProvider createCredentialProviderPlugin(String
pluginFQClassName, String param)
+ /**
+ * Creates a instance of credential provider which can return {@link
AWSCredentials} or {@link BasicAWSCredentials}
+ * based on IAM user/roles.
+ *
+ * @param pluginFQClassName
+ * @param param
+ * @return
+ * @throws IllegalArgumentException
+ */
+ public static AWSCredentialsProvider
createCredentialProviderWithPlugin(String pluginFQClassName, String param)
throws IllegalArgumentException {
try {
Class<?> clazz = Class.forName(pluginFQClassName);
Constructor<?> ctor = clazz.getConstructor();
final AwsCredentialProviderPlugin plugin =
(AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {});
plugin.init(param);
- return new AWSCredentialsProvider() {
- @Override
- public AWSCredentials getCredentials() {
- return new AWSCredentials() {
- @Override
- public String getAWSAccessKeyId() {
- return plugin.getAWSAccessKeyId();
- }
-
- @Override
- public String getAWSSecretKey() {
- return plugin.getAWSSecretKey();
- }
- };
- }
- @Override
- public void refresh() {
- plugin.refresh();
- }
- };
+ return plugin.getCredentialProvider();
} catch (Exception e) {
LOG.error("Failed to initialize AwsCredentialProviderPlugin {}",
pluginFQClassName, e);
throw new IllegalArgumentException(
@@ -202,24 +220,22 @@ public class KinesisSink implements Sink<byte[]> {
}
}
- private AWSCredentialsProvider defaultCredentialProvider(String
awsCredentialPluginParam) {
- String[] credentials = awsCredentialPluginParam.split(",");
- String accessKey = null;
- String secretKey = null;
- if (credentials.length == 2) {
- for (String credential : credentials) {
- String[] keys = credential.split("=");
- if (keys.length == 2) {
- if (keys[0].equals(ACCESS_KEY_NAME)) {
- accessKey = keys[1];
- } else if (keys[0].equals(SECRET_KEY_NAME)) {
- secretKey = keys[1];
- }
- }
- }
- }
+ /**
+ * It creates a default credential provider which takes accessKey and
secretKey form configuration and creates
+ * {@link AWSCredentials}
+ *
+ * @param awsCredentialPluginParam
+ * @return
+ */
+ protected AWSCredentialsProvider defaultCredentialProvider(String
awsCredentialPluginParam) {
+ Map<String, String> credentialMap = new
Gson().fromJson(awsCredentialPluginParam,
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ String accessKey = credentialMap.get(ACCESS_KEY_NAME);
+ String secretKey = credentialMap.get(SECRET_KEY_NAME);
checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey),
- String.format("access-key/secret-key not present in param:
format: %s=<access-key>,%s=<secret-key>",
+ String.format(
+ "Default %s and %s must be present into json-map if
AwsCredentialProviderPlugin not provided",
ACCESS_KEY_NAME, SECRET_KEY_NAME));
return defaultCredentialProvider(accessKey, secretKey);
}
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
new file mode 100644
index 0000000..2bd4a17
--- /dev/null
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.google.gson.Gson;
+
+/**
+ * Unit test of {@link KinesisSink}.
+ */
+public class KinesisSinkTest {
+
+ @Test
+ public void testDefaultCredentialProvider() throws Exception {
+ KinesisSink sink = new KinesisSink();
+ Map<String, String> credentialParam = Maps.newHashMap();
+ String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ try {
+ sink.defaultCredentialProvider(awsCredentialPluginParam);
+ Assert.fail("accessKey and SecretKey validation not applied");
+ } catch (IllegalArgumentException ie) {
+ // Ok..
+ }
+
+ final String accesKey = "ak";
+ final String secretKey = "sk";
+ credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+ credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+ awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ AWSCredentialsProvider credentialProvider =
sink.defaultCredentialProvider(awsCredentialPluginParam);
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
accesKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
secretKey);
+
+ sink.close();
+ }
+
+ @Test
+ public void testCredentialProvider() throws Exception {
+ KinesisSink sink = new KinesisSink();
+
+ final String accesKey = "ak";
+ final String secretKey = "sk";
+ Map<String, String> credentialParam = Maps.newHashMap();
+ credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+ credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+ String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ AWSCredentialsProvider credentialProvider =
sink.createCredentialProvider(null, awsCredentialPluginParam);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
accesKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
secretKey);
+
+ credentialProvider =
sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(),
"{}");
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+ AwsCredentialProviderPluginImpl.accessKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+ AwsCredentialProviderPluginImpl.secretKey);
+ Assert.assertEquals(((BasicSessionCredentials)
credentialProvider.getCredentials()).getSessionToken(),
+ AwsCredentialProviderPluginImpl.sessionToken);
+
+ sink.close();
+ }
+
+ @Test
+ public void testCredentialProviderPlugin() throws Exception {
+ KinesisSink sink = new KinesisSink();
+
+ AWSCredentialsProvider credentialProvider = sink
+
.createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(),
"{}");
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+ AwsCredentialProviderPluginImpl.accessKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+ AwsCredentialProviderPluginImpl.secretKey);
+ Assert.assertEquals(((BasicSessionCredentials)
credentialProvider.getCredentials()).getSessionToken(),
+ AwsCredentialProviderPluginImpl.sessionToken);
+
+ sink.close();
+ }
+
+ public static class AwsCredentialProviderPluginImpl implements
AwsCredentialProviderPlugin {
+
+ public final static String accessKey = "ak";
+ public final static String secretKey = "sk";
+ public final static String sessionToken = "st";
+
+ public void init(String param) {
+ // no-op
+ }
+
+ @Override
+ public AWSCredentialsProvider getCredentialProvider() {
+ return new AWSCredentialsProvider() {
+ @Override
+ public AWSCredentials getCredentials() {
+ return new BasicSessionCredentials(accessKey, secretKey,
sessionToken) {
+
+ @Override
+ public String getAWSAccessKeyId() {
+ return accessKey;
+ }
+ @Override
+ public String getAWSSecretKey() {
+ return secretKey;
+ }
+ @Override
+ public String getSessionToken() {
+ return sessionToken;
+ }
+ };
+ }
+ @Override
+ public void refresh() {
+ // TODO Auto-generated method stub
+ }
+ };
+ }
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].