This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8ba8acd Add documentation and flexible aws-credential plugin to support aws-role (#1972) 8ba8acd is described below commit 8ba8acdbe812725df6739c5d19db3e47a553f351 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Jun 18 12:15:28 2018 -0700 Add documentation and flexible aws-credential plugin to support aws-role (#1972) * Add documentation and flexible aws-credential plugin to support aws-role * add protobuf relocation --- pulsar-io/kinesis/pom.xml | 11 +- .../io/kinesis/AwsCredentialProviderPlugin.java | 36 ++--- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 104 ++++++++------ .../apache/pulsar/io/kinesis/KinesisSinkTest.java | 149 +++++++++++++++++++++ 4 files changed, 227 insertions(+), 73 deletions(-) diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 7f9f488..0dee750 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -47,13 +47,16 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> - <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> </dependency> <!-- kinesis dependencies --> @@ -110,10 +113,6 @@ <pattern>com.google.protobuf</pattern> <shadedPattern>org.apache.pulsar.replicator.com.google.protobuf</shadedPattern> </relocation> - <relocation> - <pattern>com.amazonaws</pattern> - <shadedPattern>org.apache.pulsar.com.amazonaws</shadedPattern> - </relocation> </relocations> </configuration> </execution> diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java index 8c616cc..7e463bb 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java @@ -19,42 +19,32 @@ package org.apache.pulsar.io.kinesis; +import java.io.Closeable; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicSessionCredentials; + /** * Kinesis source/sink calls credential-provider while refreshing aws accessKey and secreKey. So, implementation * AwsCredentialProviderPlugin needs to makes sure to return non-expired keys when it requires. * */ -public interface AwsCredentialProviderPlugin { - +public interface AwsCredentialProviderPlugin extends Closeable { + /** * accepts aws-account related param and initialize credential provider. - * + * * @param param */ void init(String param); - - /** - * Returns the AWS access key ID for this credentials object. - * - * @return The AWS access key ID for this credentials object. - */ - String getAWSAccessKeyId(); /** - * Returns the AWS secret access key for this credentials object. + * Returned {@link AWSCredentialsProvider} can give {@link AWSCredentials} in case credential belongs to IAM user or + * it can return {@link BasicSessionCredentials} if user wants to generate temporary credential for a given IAM + * role. * - * @return The AWS secret access key for this credentials object. - */ - String getAWSSecretKey(); - - /** - * Forces this credentials provider to refresh its credentials. For many - * implementations of credentials provider, this method may simply be a - * no-op, such as any credentials provider implementation that vends - * static/non-changing credentials. For other implementations that vend - * different credentials through out their lifetime, this method should - * force the credentials provider to refresh its credentials. + * @return */ - void refresh(); + AWSCredentialsProvider getCredentialProvider(); } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 39af1f3..5a43312 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -30,6 +30,9 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; @@ -37,20 +40,35 @@ import org.slf4j.LoggerFactory; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel; import com.amazonaws.services.kinesis.producer.UserRecordResult; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.commons.lang3.builder.ReflectionToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; /** - * A Kinesis sink + * A Kinesis sink which can be configured by {@link KinesisSinkConfig}. + * <pre> + * {@link KinesisSinkConfig} accepts + * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at : https://docs.aws.amazon.com/general/latest/gr/rande.html + * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2 + * 3. <b>awsKinesisStreamName:</b> kinesis stream name + * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of implementation of {@link AwsCredentialProviderPlugin}. + * - It is a factory class which creates an {@link AWSCredentialsProvider} that will be used by {@link KinesisProducer} + * - If it is empty then {@link KinesisSink} creates default {@link AWSCredentialsProvider} + * which accepts json-map of credentials in awsCredentialPluginParam + * eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"} + * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link AwsCredentialProviderPlugin} + * + * </pre> + * */ public class KinesisSink implements Sink<byte[]> { @@ -59,17 +77,27 @@ public class KinesisSink implements Sink<byte[]> { private KinesisProducer kinesisProducer; private KinesisSinkConfig kinesisSinkConfig; private String streamName; + private static final String defaultPartitionedKey = "default"; + private static final int maxPartitionedKeyLength = 256; public static final String ACCESS_KEY_NAME = "accessKey"; public static final String SECRET_KEY_NAME = "secretKey"; @Override public void write(RecordContext inputRecordContext, byte[] value) throws Exception { - final String partitionedKey = inputRecordContext.getPartitionId(); + String partitionedKey = StringUtils.isNotBlank(inputRecordContext.getPartitionId()) + ? inputRecordContext.getPartitionId() + : defaultPartitionedKey; + partitionedKey = partitionedKey.length() > maxPartitionedKeyLength + ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) + : partitionedKey; // partitionedKey Length must be at least one, and at most 256 ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, partitionedKey, ByteBuffer.wrap(value)); addCallback(addRecordResult, ProducerSendCallback.create(this.streamName, inputRecordContext, System.nanoTime()), directExecutor()); + if (LOG.isDebugEnabled()) { + LOG.debug("Published message to kinesis stream {} with size {}", streamName, value.length); + } } @Override @@ -107,10 +135,10 @@ public class KinesisSink implements Sink<byte[]> { LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig, ToStringStyle.SHORT_PREFIX_STYLE))); } - private AWSCredentialsProvider createCredentialProvider(String awsCredentialPluginName, + protected AWSCredentialsProvider createCredentialProvider(String awsCredentialPluginName, String awsCredentialPluginParam) { if (isNotBlank(awsCredentialPluginName)) { - return createCredentialProvider(awsCredentialPluginName, awsCredentialPluginParam); + return createCredentialProviderWithPlugin(awsCredentialPluginName, awsCredentialPluginParam); } else { return defaultCredentialProvider(awsCredentialPluginParam); } @@ -168,33 +196,23 @@ public class KinesisSink implements Sink<byte[]> { } } - public static AWSCredentialsProvider createCredentialProviderPlugin(String pluginFQClassName, String param) + /** + * Creates a instance of credential provider which can return {@link AWSCredentials} or {@link BasicAWSCredentials} + * based on IAM user/roles. + * + * @param pluginFQClassName + * @param param + * @return + * @throws IllegalArgumentException + */ + public static AWSCredentialsProvider createCredentialProviderWithPlugin(String pluginFQClassName, String param) throws IllegalArgumentException { try { Class<?> clazz = Class.forName(pluginFQClassName); Constructor<?> ctor = clazz.getConstructor(); final AwsCredentialProviderPlugin plugin = (AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {}); plugin.init(param); - return new AWSCredentialsProvider() { - @Override - public AWSCredentials getCredentials() { - return new AWSCredentials() { - @Override - public String getAWSAccessKeyId() { - return plugin.getAWSAccessKeyId(); - } - - @Override - public String getAWSSecretKey() { - return plugin.getAWSSecretKey(); - } - }; - } - @Override - public void refresh() { - plugin.refresh(); - } - }; + return plugin.getCredentialProvider(); } catch (Exception e) { LOG.error("Failed to initialize AwsCredentialProviderPlugin {}", pluginFQClassName, e); throw new IllegalArgumentException( @@ -202,24 +220,22 @@ public class KinesisSink implements Sink<byte[]> { } } - private AWSCredentialsProvider defaultCredentialProvider(String awsCredentialPluginParam) { - String[] credentials = awsCredentialPluginParam.split(","); - String accessKey = null; - String secretKey = null; - if (credentials.length == 2) { - for (String credential : credentials) { - String[] keys = credential.split("="); - if (keys.length == 2) { - if (keys[0].equals(ACCESS_KEY_NAME)) { - accessKey = keys[1]; - } else if (keys[0].equals(SECRET_KEY_NAME)) { - secretKey = keys[1]; - } - } - } - } + /** + * It creates a default credential provider which takes accessKey and secretKey form configuration and creates + * {@link AWSCredentials} + * + * @param awsCredentialPluginParam + * @return + */ + protected AWSCredentialsProvider defaultCredentialProvider(String awsCredentialPluginParam) { + Map<String, String> credentialMap = new Gson().fromJson(awsCredentialPluginParam, + new TypeToken<Map<String, String>>() { + }.getType()); + String accessKey = credentialMap.get(ACCESS_KEY_NAME); + String secretKey = credentialMap.get(SECRET_KEY_NAME); checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey), - String.format("access-key/secret-key not present in param: format: %s=<access-key>,%s=<secret-key>", + String.format( + "Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided", ACCESS_KEY_NAME, SECRET_KEY_NAME)); return defaultCredentialProvider(accessKey, secretKey); } diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java new file mode 100644 index 0000000..2bd4a17 --- /dev/null +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kinesis; + +import java.io.IOException; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; +import org.testng.collections.Maps; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicSessionCredentials; +import com.google.gson.Gson; + +/** + * Unit test of {@link KinesisSink}. + */ +public class KinesisSinkTest { + + @Test + public void testDefaultCredentialProvider() throws Exception { + KinesisSink sink = new KinesisSink(); + Map<String, String> credentialParam = Maps.newHashMap(); + String awsCredentialPluginParam = new Gson().toJson(credentialParam); + try { + sink.defaultCredentialProvider(awsCredentialPluginParam); + Assert.fail("accessKey and SecretKey validation not applied"); + } catch (IllegalArgumentException ie) { + // Ok.. + } + + final String accesKey = "ak"; + final String secretKey = "sk"; + credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); + credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); + awsCredentialPluginParam = new Gson().toJson(credentialParam); + AWSCredentialsProvider credentialProvider = sink.defaultCredentialProvider(awsCredentialPluginParam); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); + + sink.close(); + } + + @Test + public void testCredentialProvider() throws Exception { + KinesisSink sink = new KinesisSink(); + + final String accesKey = "ak"; + final String secretKey = "sk"; + Map<String, String> credentialParam = Maps.newHashMap(); + credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey); + credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey); + String awsCredentialPluginParam = new Gson().toJson(credentialParam); + AWSCredentialsProvider credentialProvider = sink.createCredentialProvider(null, awsCredentialPluginParam); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey); + + credentialProvider = sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(), "{}"); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), + AwsCredentialProviderPluginImpl.accessKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), + AwsCredentialProviderPluginImpl.secretKey); + Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), + AwsCredentialProviderPluginImpl.sessionToken); + + sink.close(); + } + + @Test + public void testCredentialProviderPlugin() throws Exception { + KinesisSink sink = new KinesisSink(); + + AWSCredentialsProvider credentialProvider = sink + .createCredentialProviderWithPlugin(AwsCredentialProviderPluginImpl.class.getName(), "{}"); + Assert.assertNotNull(credentialProvider); + Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), + AwsCredentialProviderPluginImpl.accessKey); + Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), + AwsCredentialProviderPluginImpl.secretKey); + Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(), + AwsCredentialProviderPluginImpl.sessionToken); + + sink.close(); + } + + public static class AwsCredentialProviderPluginImpl implements AwsCredentialProviderPlugin { + + public final static String accessKey = "ak"; + public final static String secretKey = "sk"; + public final static String sessionToken = "st"; + + public void init(String param) { + // no-op + } + + @Override + public AWSCredentialsProvider getCredentialProvider() { + return new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new BasicSessionCredentials(accessKey, secretKey, sessionToken) { + + @Override + public String getAWSAccessKeyId() { + return accessKey; + } + @Override + public String getAWSSecretKey() { + return secretKey; + } + @Override + public String getSessionToken() { + return sessionToken; + } + }; + } + @Override + public void refresh() { + // TODO Auto-generated method stub + } + }; + } + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + } + +} -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.