rdhabalia closed pull request #1972: Add documentation and flexible
aws-credential plugin to support aws-role
URL: https://github.com/apache/incubator-pulsar/pull/1972
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 7f9f488e29..0dee750dc7 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -47,13 +47,16 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
- <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
</dependency>
<!-- kinesis dependencies -->
@@ -110,10 +113,6 @@
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.pulsar.replicator.com.google.protobuf</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.amazonaws</pattern>
-
<shadedPattern>org.apache.pulsar.com.amazonaws</shadedPattern>
- </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
index 8c616cc677..7e463bbd50 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
@@ -19,42 +19,32 @@
package org.apache.pulsar.io.kinesis;
+import java.io.Closeable;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+
/**
* Kinesis source/sink calls credential-provider while refreshing aws
accessKey and secreKey. So, implementation
* AwsCredentialProviderPlugin needs to makes sure to return non-expired keys
when it requires.
*
*/
-public interface AwsCredentialProviderPlugin {
-
+public interface AwsCredentialProviderPlugin extends Closeable {
+
/**
* accepts aws-account related param and initialize credential provider.
- *
+ *
* @param param
*/
void init(String param);
-
- /**
- * Returns the AWS access key ID for this credentials object.
- *
- * @return The AWS access key ID for this credentials object.
- */
- String getAWSAccessKeyId();
/**
- * Returns the AWS secret access key for this credentials object.
+ * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials}
in case credential belongs to IAM user or
+ * it can return {@link BasicSessionCredentials} if user wants to generate
temporary credential for a given IAM
+ * role.
*
- * @return The AWS secret access key for this credentials object.
- */
- String getAWSSecretKey();
-
- /**
- * Forces this credentials provider to refresh its credentials. For many
- * implementations of credentials provider, this method may simply be a
- * no-op, such as any credentials provider implementation that vends
- * static/non-changing credentials. For other implementations that vend
- * different credentials through out their lifetime, this method should
- * force the credentials provider to refresh its credentials.
+ * @return
*/
- void refresh();
+ AWSCredentialsProvider getCredentialProvider();
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 39af1f300a..5a433122b3 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -30,6 +30,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
@@ -37,20 +40,35 @@
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
/**
- * A Kinesis sink
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
+ * {@link KinesisSinkConfig} accepts
+ * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at :
https://docs.aws.amazon.com/general/latest/gr/rande.html
+ * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
+ * 3. <b>awsKinesisStreamName:</b> kinesis stream name
+ * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of
implementation of {@link AwsCredentialProviderPlugin}.
+ * - It is a factory class which creates an {@link AWSCredentialsProvider}
that will be used by {@link KinesisProducer}
+ * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
+ * which accepts json-map of credentials in awsCredentialPluginParam
+ * eg: awsCredentialPluginParam =
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
+ * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link
AwsCredentialProviderPlugin}
+ *
+ * </pre>
+ *
*/
public class KinesisSink implements Sink<byte[]> {
@@ -59,17 +77,27 @@
private KinesisProducer kinesisProducer;
private KinesisSinkConfig kinesisSinkConfig;
private String streamName;
+ private static final String defaultPartitionedKey = "default";
+ private static final int maxPartitionedKeyLength = 256;
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";
@Override
public void write(RecordContext inputRecordContext, byte[] value) throws
Exception {
- final String partitionedKey = inputRecordContext.getPartitionId();
+ String partitionedKey =
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
+ ? inputRecordContext.getPartitionId()
+ : defaultPartitionedKey;
+ partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
+ ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
+ : partitionedKey; // partitionedKey Length must be at least
one, and at most 256
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
partitionedKey, ByteBuffer.wrap(value));
addCallback(addRecordResult,
ProducerSendCallback.create(this.streamName,
inputRecordContext, System.nanoTime()), directExecutor());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Published message to kinesis stream {} with size {}",
streamName, value.length);
+ }
}
@Override
@@ -107,10 +135,10 @@ public void open(Map<String, Object> config) throws
Exception {
LOG.info("Kinesis sink started. {}",
(ReflectionToStringBuilder.toString(kinesisConfig,
ToStringStyle.SHORT_PREFIX_STYLE)));
}
- private AWSCredentialsProvider createCredentialProvider(String
awsCredentialPluginName,
+ protected AWSCredentialsProvider createCredentialProvider(String
awsCredentialPluginName,
String awsCredentialPluginParam) {
if (isNotBlank(awsCredentialPluginName)) {
- return createCredentialProvider(awsCredentialPluginName,
awsCredentialPluginParam);
+ return createCredentialProviderWithPlugin(awsCredentialPluginName,
awsCredentialPluginParam);
} else {
return defaultCredentialProvider(awsCredentialPluginParam);
}
@@ -168,33 +196,23 @@ public void onFailure(Throwable exception) {
}
}
- public static AWSCredentialsProvider createCredentialProviderPlugin(String
pluginFQClassName, String param)
+ /**
+ * Creates a instance of credential provider which can return {@link
AWSCredentials} or {@link BasicAWSCredentials}
+ * based on IAM user/roles.
+ *
+ * @param pluginFQClassName
+ * @param param
+ * @return
+ * @throws IllegalArgumentException
+ */
+ public static AWSCredentialsProvider
createCredentialProviderWithPlugin(String pluginFQClassName, String param)
throws IllegalArgumentException {
try {
Class<?> clazz = Class.forName(pluginFQClassName);
Constructor<?> ctor = clazz.getConstructor();
final AwsCredentialProviderPlugin plugin =
(AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {});
plugin.init(param);
- return new AWSCredentialsProvider() {
- @Override
- public AWSCredentials getCredentials() {
- return new AWSCredentials() {
- @Override
- public String getAWSAccessKeyId() {
- return plugin.getAWSAccessKeyId();
- }
-
- @Override
- public String getAWSSecretKey() {
- return plugin.getAWSSecretKey();
- }
- };
- }
- @Override
- public void refresh() {
- plugin.refresh();
- }
- };
+ return plugin.getCredentialProvider();
} catch (Exception e) {
LOG.error("Failed to initialize AwsCredentialProviderPlugin {}",
pluginFQClassName, e);
throw new IllegalArgumentException(
@@ -202,24 +220,22 @@ public void refresh() {
}
}
- private AWSCredentialsProvider defaultCredentialProvider(String
awsCredentialPluginParam) {
- String[] credentials = awsCredentialPluginParam.split(",");
- String accessKey = null;
- String secretKey = null;
- if (credentials.length == 2) {
- for (String credential : credentials) {
- String[] keys = credential.split("=");
- if (keys.length == 2) {
- if (keys[0].equals(ACCESS_KEY_NAME)) {
- accessKey = keys[1];
- } else if (keys[0].equals(SECRET_KEY_NAME)) {
- secretKey = keys[1];
- }
- }
- }
- }
+ /**
+ * It creates a default credential provider which takes accessKey and
secretKey form configuration and creates
+ * {@link AWSCredentials}
+ *
+ * @param awsCredentialPluginParam
+ * @return
+ */
+ protected AWSCredentialsProvider defaultCredentialProvider(String
awsCredentialPluginParam) {
+ Map<String, String> credentialMap = new
Gson().fromJson(awsCredentialPluginParam,
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ String accessKey = credentialMap.get(ACCESS_KEY_NAME);
+ String secretKey = credentialMap.get(SECRET_KEY_NAME);
checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey),
- String.format("access-key/secret-key not present in param:
format: %s=<access-key>,%s=<secret-key>",
+ String.format(
+ "Default %s and %s must be present into json-map if
AwsCredentialProviderPlugin not provided",
ACCESS_KEY_NAME, SECRET_KEY_NAME));
return defaultCredentialProvider(accessKey, secretKey);
}
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
new file mode 100644
index 0000000000..2bd4a17380
--- /dev/null
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.google.gson.Gson;
+
+/**
+ * Unit test of {@link KinesisSink}.
+ */
+public class KinesisSinkTest {
+
+ @Test
+ public void testDefaultCredentialProvider() throws Exception {
+ KinesisSink sink = new KinesisSink();
+ Map<String, String> credentialParam = Maps.newHashMap();
+ String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ try {
+ sink.defaultCredentialProvider(awsCredentialPluginParam);
+ Assert.fail("accessKey and SecretKey validation not applied");
+ } catch (IllegalArgumentException ie) {
+ // Ok..
+ }
+
+ final String accesKey = "ak";
+ final String secretKey = "sk";
+ credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+ credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+ awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ AWSCredentialsProvider credentialProvider =
sink.defaultCredentialProvider(awsCredentialPluginParam);
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
accesKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
secretKey);
+
+ sink.close();
+ }
+
+ @Test
+ public void testCredentialProvider() throws Exception {
+ KinesisSink sink = new KinesisSink();
+
+ final String accesKey = "ak";
+ final String secretKey = "sk";
+ Map<String, String> credentialParam = Maps.newHashMap();
+ credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
+ credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
+ String awsCredentialPluginParam = new Gson().toJson(credentialParam);
+ AWSCredentialsProvider credentialProvider =
sink.createCredentialProvider(null, awsCredentialPluginParam);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
accesKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
secretKey);
+
+ credentialProvider =
sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(),
"{}");
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+ AwsCredentialProviderPluginImpl.accessKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+ AwsCredentialProviderPluginImpl.secretKey);
+ Assert.assertEquals(((BasicSessionCredentials)
credentialProvider.getCredentials()).getSessionToken(),
+ AwsCredentialProviderPluginImpl.sessionToken);
+
+ sink.close();
+ }
+
+ @Test
+ public void testCredentialProviderPlugin() throws Exception {
+ KinesisSink sink = new KinesisSink();
+
+ AWSCredentialsProvider credentialProvider = sink
+
.createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(),
"{}");
+ Assert.assertNotNull(credentialProvider);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
+ AwsCredentialProviderPluginImpl.accessKey);
+
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
+ AwsCredentialProviderPluginImpl.secretKey);
+ Assert.assertEquals(((BasicSessionCredentials)
credentialProvider.getCredentials()).getSessionToken(),
+ AwsCredentialProviderPluginImpl.sessionToken);
+
+ sink.close();
+ }
+
+ public static class AwsCredentialProviderPluginImpl implements
AwsCredentialProviderPlugin {
+
+ public final static String accessKey = "ak";
+ public final static String secretKey = "sk";
+ public final static String sessionToken = "st";
+
+ public void init(String param) {
+ // no-op
+ }
+
+ @Override
+ public AWSCredentialsProvider getCredentialProvider() {
+ return new AWSCredentialsProvider() {
+ @Override
+ public AWSCredentials getCredentials() {
+ return new BasicSessionCredentials(accessKey, secretKey,
sessionToken) {
+
+ @Override
+ public String getAWSAccessKeyId() {
+ return accessKey;
+ }
+ @Override
+ public String getAWSSecretKey() {
+ return secretKey;
+ }
+ @Override
+ public String getSessionToken() {
+ return sessionToken;
+ }
+ };
+ }
+ @Override
+ public void refresh() {
+ // TODO Auto-generated method stub
+ }
+ };
+ }
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+ }
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services