This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e8e44d59563532ba770c5b26426f66cee40a937
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Jan 19 14:04:13 2018 +0100

    CAMEL-12164 - Camel-AWS Kinesis: Expose options to avoid a required client 
in the registry
---
 .../src/main/docs/aws-kinesis-component.adoc       |  9 +++-
 .../component/aws/kinesis/KinesisComponent.java    |  1 -
 .../aws/kinesis/KinesisConfiguration.java          | 53 ++++++++++++++++++++--
 .../component/aws/kinesis/KinesisEndpoint.java     | 52 +++++++++++++++++++--
 .../KinesisComponentConfigurationTest.java}        | 37 ++++++---------
 .../KinesisConsumerClosedShardWithFailTest.java    |  2 +
 .../KinesisConsumerClosedShardWithSilentTest.java  |  1 +
 .../component/aws/kinesis/KinesisConsumerTest.java |  1 +
 .../component/aws/kinesis/KinesisEndpointTest.java |  8 ++--
 9 files changed, 127 insertions(+), 37 deletions(-)

diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc 
b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
index 5ec4b7d..559a89c 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
@@ -52,12 +52,15 @@ with the following path and query parameters:
 | *streamName* | *Required* Name of the stream |  | String
 |===
 
-==== Query Parameters (25 parameters):
+==== Query Parameters (30 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *amazonKinesisClient* (common) | *Required* Amazon Kinesis client to use for 
all requests for this endpoint |  | AmazonKinesis
+| *amazonKinesisClient* (common) | Amazon Kinesis client to use for all 
requests for this endpoint |  | AmazonKinesis
+| *proxyHost* (common) | To define a proxy host when instantiating the 
DDBStreams client |  | String
+| *proxyPort* (common) | To define a proxy port when instantiating the 
DDBStreams client |  | Integer
+| *region* (common) | The region in which Kinesis client needs to work |  | 
String
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | *iteratorType* (consumer) | Defines where in the Kinesis stream to start 
getting records | TRIM_HORIZON | ShardIteratorType
 | *maxResultsPerRequest* (consumer) | Maximum number of records that will be 
fetched in each poll | 1 | int
@@ -82,6 +85,8 @@ with the following path and query parameters:
 | *startScheduler* (scheduler) | Whether the scheduler should be auto started. 
| true | boolean
 | *timeUnit* (scheduler) | Time unit for initialDelay and delay options. | 
MILLISECONDS | TimeUnit
 | *useFixedDelay* (scheduler) | Controls if fixed delay or fixed rate is used. 
See ScheduledExecutorService in JDK for details. | true | boolean
+| *accessKey* (security) | Amazon AWS Access Key |  | String
+| *secretKey* (security) | Amazon AWS Secret Key |  | String
 |===
 // endpoint options: END
 
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
index a264d3c..34789af 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
@@ -39,7 +39,6 @@ public class KinesisComponent extends DefaultComponent {
         setProperties(configuration, parameters);
         
         KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, 
this);
-        setProperties(endpoint, parameters);
         return endpoint;
     }
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
index c42e5ef..a6d38e3 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
@@ -29,8 +29,13 @@ public class KinesisConfiguration {
     @UriPath(description = "Name of the stream")
     @Metadata(required = "true")
     private String streamName;
+    @UriParam(label = "security", secret = true, description = "Amazon AWS 
Access Key")
+    private String accessKey;
+    @UriParam(label = "security", secret = true, description = "Amazon AWS 
Secret Key")
+    private String secretKey;
+    @UriParam(description = "The region in which Kinesis client needs to work")
+    private String region;
     @UriParam(description = "Amazon Kinesis client to use for all requests for 
this endpoint")
-    @Metadata(required = "true")
     private AmazonKinesis amazonKinesisClient;
     @UriParam(label = "consumer", description = "Maximum number of records 
that will be fetched in each poll", defaultValue = "1")
     private int maxResultsPerRequest = 1;
@@ -45,8 +50,11 @@ public class KinesisConfiguration {
                                                                          + "in 
case of silent there will be no logging and the consumer will start from the 
beginning,"
                                                                          + "in 
case of fail a ReachedClosedStateException will be raised")
     private KinesisShardClosedStrategyEnum shardClosed;
-    
-    // required for injection.
+    @UriParam(description = "To define a proxy host when instantiating the 
DDBStreams client")
+    private String proxyHost;
+    @UriParam(description = "To define a proxy port when instantiating the 
DDBStreams client")
+    private Integer proxyPort;
+
     public AmazonKinesis getAmazonKinesisClient() {
         return amazonKinesisClient;
     }
@@ -102,5 +110,44 @@ public class KinesisConfiguration {
     public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) {
         this.shardClosed = shardClosed;
     }
+    
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
 
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }   
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 1a7e79b..127cdd6 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -16,7 +16,13 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 
@@ -27,6 +33,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * The aws-kinesis component is for consuming and producing records from Amazon
@@ -37,7 +44,9 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @UriParam
     private KinesisConfiguration configuration;
-
+    
+    private AmazonKinesis kinesisClient;
+    
     public KinesisEndpoint(String uri, KinesisConfiguration configuration, 
KinesisComponent component) {
         super(uri, component);
         this.configuration = configuration;
@@ -45,11 +54,15 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
+        kinesisClient = configuration.getAmazonKinesisClient() != null ? 
configuration.getAmazonKinesisClient()
+            : createKinesisClient();
+       
+        
         if 
((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
 || 
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
             && configuration.getSequenceNumber().isEmpty()) {
             throw new IllegalArgumentException("Sequence Number must be 
specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
         }
-        super.doStart();
     }
 
     @Override
@@ -80,10 +93,43 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     }
 
     public AmazonKinesis getClient() {
-        return configuration.getAmazonKinesisClient();
+        return kinesisClient;
     }
 
     public KinesisConfiguration getConfiguration() {
         return configuration;
     }
+    
+    AmazonKinesis createKinesisClient() {
+        AmazonKinesis client = null;
+        ClientConfiguration clientConfiguration = null;
+        AmazonKinesisClientBuilder clientBuilder = null;
+        boolean isClientConfigFound = false;
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProxyHost(configuration.getProxyHost());
+            clientConfiguration.setProxyPort(configuration.getProxyPort());
+            isClientConfigFound = true;
+        }
+        if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null) {
+            AWSCredentials credentials = new 
BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+            AWSCredentialsProvider credentialsProvider = new 
AWSStaticCredentialsProvider(credentials);
+            if (isClientConfigFound) {
+                clientBuilder = 
AmazonKinesisClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+            } else {
+                clientBuilder = 
AmazonKinesisClientBuilder.standard().withCredentials(credentialsProvider);
+            }
+        } else {
+            if (isClientConfigFound) {
+                clientBuilder = AmazonKinesisClientBuilder.standard();
+            } else {
+                clientBuilder = 
AmazonKinesisClientBuilder.standard().withClientConfiguration(clientConfiguration);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = 
clientBuilder.withRegion(configuration.getRegion());
+        }
+        client = clientBuilder.build();
+        return client;
+    }
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
similarity index 52%
copy from 
components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
copy to 
components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
index a264d3c..0e45169 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
@@ -16,30 +16,19 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
-import java.util.Map;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.DefaultComponent;
-
-public class KinesisComponent extends DefaultComponent {
-
-    public KinesisComponent() {
-        this(null);
-    }
-
-    public KinesisComponent(CamelContext context) {
-        super(context);
-    }
-
-    @Override
-    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
-        KinesisConfiguration configuration = new KinesisConfiguration();
-        configuration.setStreamName(remaining);
-        setProperties(configuration, parameters);
+public class KinesisComponentConfigurationTest extends CamelTestSupport {
+    
+    @Test
+    public void createEndpointWithAccessAndSecretKey() throws Exception {
+        KinesisComponent component = new KinesisComponent(context);
+        KinesisEndpoint endpoint = 
(KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name?accessKey=xxxxx&secretKey=yyyyy");
         
-        KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, 
this);
-        setProperties(endpoint, parameters);
-        return endpoint;
+        assertEquals("some_stream_name", 
endpoint.getConfiguration().getStreamName());
+        assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());    
     }
-}
+    
+}
\ No newline at end of file
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 6f0b7e9..eb1d24f 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -67,7 +67,9 @@ public class KinesisConsumerClosedShardWithFailTest {
         configuration.setShardClosed(KinesisShardClosedStrategyEnum.fail);
         configuration.setStreamName("streamName");
         KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, 
component);
+        endpoint.start();
         undertest = new KinesisConsumer(endpoint, processor);
+        
 
         SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber("20");
         Shard shard = new 
Shard().withShardId("shardId").withSequenceNumberRange(range);
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 91e5b7c..7815002 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -71,6 +71,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
         configuration.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
         KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, 
component);
+        endpoint.start();
         undertest = new KinesisConsumer(endpoint, processor);
         
         SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber("20");
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 6653025..9f990da 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -72,6 +72,7 @@ public class KinesisConsumerTest {
         configuration.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
         KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, 
component);
+        endpoint.start();
         undertest = new KinesisConsumer(endpoint, processor);
         
         SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber(null);
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
index a49e074..4f02ec4 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
@@ -55,7 +55,7 @@ public class KinesisEndpointTest {
                 + "&sequenceNumber=123"
         );
 
-        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), 
is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), 
is("some_stream_name"));
         assertThat(endpoint.getConfiguration().getIteratorType(), 
is(ShardIteratorType.LATEST));
         assertThat(endpoint.getConfiguration().getMaxResultsPerRequest(), 
is(101));
@@ -69,7 +69,7 @@ public class KinesisEndpointTest {
                 + "?amazonKinesisClient=#kinesisClient"
         );
 
-        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), 
is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), 
is("some_stream_name"));
         assertThat(endpoint.getConfiguration().getIteratorType(), 
is(ShardIteratorType.TRIM_HORIZON));
         assertThat(endpoint.getConfiguration().getMaxResultsPerRequest(), 
is(1));
@@ -84,7 +84,7 @@ public class KinesisEndpointTest {
                 + "&sequenceNumber=123"
         );
 
-        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), 
is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), 
is("some_stream_name"));
         assertThat(endpoint.getConfiguration().getIteratorType(), 
is(ShardIteratorType.AFTER_SEQUENCE_NUMBER));
         assertThat(endpoint.getConfiguration().getShardId(), is("abc"));
@@ -100,7 +100,7 @@ public class KinesisEndpointTest {
                 + "&sequenceNumber=123"
         );
 
-        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), 
is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), 
is("some_stream_name"));
         assertThat(endpoint.getConfiguration().getIteratorType(), 
is(ShardIteratorType.AT_SEQUENCE_NUMBER));
         assertThat(endpoint.getConfiguration().getShardId(), is("abc"));

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <commits@camel.apache.org>.

Reply via email to