This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3c4c288e0bae52b8161e90df4e412c48e3a108d2 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Feb 27 17:14:33 2020 +0100 CAMEL-14520 - Create an AWS-Kinesis component based on SDK v2, fixed CS --- .../aws2/firehose/KinesisFirehose2Component.java | 14 ++++---- .../firehose/KinesisFirehose2Configuration.java | 10 +++--- .../aws2/firehose/KinesisFirehose2Endpoint.java | 19 ++++++----- .../aws2/firehose/KinesisFirehose2Producer.java | 5 ++- .../component/aws2/kinesis/Kinesis2Component.java | 14 ++++---- .../aws2/kinesis/Kinesis2Configuration.java | 16 +++++----- .../component/aws2/kinesis/Kinesis2Constants.java | 3 +- .../component/aws2/kinesis/Kinesis2Consumer.java | 26 +++++++-------- .../component/aws2/kinesis/Kinesis2Endpoint.java | 14 ++++---- .../component/aws2/kinesis/Kinesis2Producer.java | 4 +-- .../kinesis/Kinesis2ShardClosedStrategyEnum.java | 4 +-- .../KinesisFirehoseComponentConfigurationTest.java | 24 +++++++------- .../aws2/firehose/KinesisFirehoseEndpointTest.java | 12 +++---- .../KinesisFirehoseComponentIntegrationTest.java | 6 ++-- .../kinesis/KinesisComponentConfigurationTest.java | 21 ++++++------ .../KinesisConsumerClosedShardWithFailTest.java | 1 - .../KinesisConsumerClosedShardWithSilentTest.java | 37 +++++----------------- .../aws2/kinesis/KinesisEndpointTest.java | 29 ++++------------- .../aws2/kinesis/RecordStringConverterTest.java | 4 +-- 19 files changed, 109 insertions(+), 154 deletions(-) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java index a9a25b9..61e716c 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java @@ -38,16 +38,16 @@ public class KinesisFirehose2Component extends DefaultComponent { private String secretKey; @Metadata private String region; - @Metadata(label = "advanced") + @Metadata(label = "advanced") private KinesisFirehose2Configuration configuration; - + public KinesisFirehose2Component() { this(null); } public KinesisFirehose2Component(CamelContext context) { super(context); - + registerExtension(new KinesisFirehose2ComponentVerifierExtension()); } @@ -66,7 +66,7 @@ public class KinesisFirehose2Component extends DefaultComponent { } return endpoint; } - + public KinesisFirehose2Configuration getConfiguration() { return configuration; } @@ -77,7 +77,7 @@ public class KinesisFirehose2Component extends DefaultComponent { public void setConfiguration(KinesisFirehose2Configuration configuration) { this.configuration = configuration; } - + public String getAccessKey() { return accessKey; } @@ -99,7 +99,7 @@ public class KinesisFirehose2Component extends DefaultComponent { public void setSecretKey(String secretKey) { this.secretKey = secretKey; } - + public String getRegion() { return region; } @@ -110,7 +110,7 @@ public class KinesisFirehose2Component extends DefaultComponent { public void setRegion(String region) { this.region = region; } - + private void checkAndSetRegistryClient(KinesisFirehose2Configuration configuration) { Set<FirehoseClient> clients = getCamelContext().getRegistry().findByType(FirehoseClient.class); if (clients.size() == 1) { diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java index 82cab1f..67724ba 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java @@ -36,8 +36,8 @@ public class KinesisFirehose2Configuration implements Cloneable { private String accessKey; @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key") private String secretKey; - @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" - + "You'll need to use the name Regions.EU_WEST_1.name()") + @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" + + "You'll need to use the name Regions.EU_WEST_1.name()") private String region; @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint") private FirehoseClient amazonKinesisFirehoseClient; @@ -47,7 +47,7 @@ public class KinesisFirehose2Configuration implements Cloneable { private String proxyHost; @UriParam(description = "To define a proxy port when instantiating the Kinesis Firehose client") private Integer proxyPort; - + public void setAmazonKinesisFirehoseClient(FirehoseClient client) { this.amazonKinesisFirehoseClient = client; } @@ -94,7 +94,7 @@ public class KinesisFirehose2Configuration implements Cloneable { public void setProxyProtocol(Protocol proxyProtocol) { this.proxyProtocol = proxyProtocol; - } + } public String getProxyHost() { return proxyHost; @@ -111,7 +111,7 @@ public class KinesisFirehose2Configuration implements Cloneable { public void setProxyPort(Integer proxyPort) { this.proxyPort = proxyPort; } - + // ************************************************* // // ************************************************* diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java index 8a85366..7fd05f7 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java @@ -37,15 +37,15 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; /** - * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams. + * The aws-kinesis-firehose component is used for producing Amazon's Kinesis + * Firehose streams. */ -@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName", - producerOnly = true, label = "cloud,messaging") +@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName", producerOnly = true, label = "cloud,messaging") public class KinesisFirehose2Endpoint extends DefaultEndpoint { @UriParam private KinesisFirehose2Configuration configuration; - + private FirehoseClient kinesisFirehoseClient; public KinesisFirehose2Endpoint(String uri, KinesisFirehose2Configuration configuration, KinesisFirehose2Component component) { @@ -62,15 +62,14 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint { public Consumer createConsumer(Processor processor) throws Exception { throw new UnsupportedOperationException("You cannot consume messages from this endpoint"); } - + @Override protected void doStart() throws Exception { super.doStart(); - kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() - : createKinesisFirehoseClient(); - + kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() : createKinesisFirehoseClient(); + } - + @Override public void doStop() throws Exception { if (ObjectHelper.isEmpty(configuration.getAmazonKinesisFirehoseClient())) { @@ -80,7 +79,7 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint { } super.doStop(); } - + FirehoseClient createKinesisFirehoseClient() { FirehoseClient client = null; FirehoseClientBuilder clientBuilder = FirehoseClient.builder(); diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 7f26ec8..0fe76b1 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -18,7 +18,6 @@ package org.apache.camel.component.aws2.firehose; import java.nio.ByteBuffer; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; @@ -40,7 +39,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { @Override public KinesisFirehose2Endpoint getEndpoint() { - return (KinesisFirehose2Endpoint) super.getEndpoint(); + return (KinesisFirehose2Endpoint)super.getEndpoint(); } @Override @@ -63,7 +62,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { putRecordRequest.record(record.build()); return putRecordRequest.build(); } - + public static Message getMessageForResponse(final Exchange exchange) { return exchange.getMessage(); } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java index 36dee4f..a4ec5da 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java @@ -36,7 +36,7 @@ public class Kinesis2Component extends DefaultComponent { private String secretKey; @Metadata private String region; - @Metadata(label = "advanced") + @Metadata(label = "advanced") private Kinesis2Configuration configuration; public Kinesis2Component() { @@ -45,7 +45,7 @@ public class Kinesis2Component extends DefaultComponent { public Kinesis2Component(CamelContext context) { super(context); - + registerExtension(new Kinesis2ComponentVerifierExtension()); } @@ -61,10 +61,10 @@ public class Kinesis2Component extends DefaultComponent { checkAndSetRegistryClient(configuration); if (configuration.getAmazonKinesisClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) { throw new IllegalArgumentException("amazonKinesisClient or accessKey and secretKey must be specified"); - } + } return endpoint; } - + public Kinesis2Configuration getConfiguration() { return configuration; } @@ -75,7 +75,7 @@ public class Kinesis2Component extends DefaultComponent { public void setConfiguration(Kinesis2Configuration configuration) { this.configuration = configuration; } - + public String getAccessKey() { return accessKey; } @@ -97,7 +97,7 @@ public class Kinesis2Component extends DefaultComponent { public void setSecretKey(String secretKey) { this.secretKey = secretKey; } - + public String getRegion() { return region; } @@ -108,7 +108,7 @@ public class Kinesis2Component extends DefaultComponent { public void setRegion(String region) { this.region = region; } - + private void checkAndSetRegistryClient(Kinesis2Configuration configuration) { Set<KinesisClient> clients = getCamelContext().getRegistry().findByType(KinesisClient.class); if (clients.size() == 1) { diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index 575f057..0dca1a2 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -28,7 +28,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @UriParams public class Kinesis2Configuration implements Cloneable { - + @UriPath(description = "Name of the stream") @Metadata(required = true) private String streamName; @@ -36,8 +36,8 @@ public class Kinesis2Configuration implements Cloneable { private String accessKey; @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key") private String secretKey; - @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" - + "You'll need to use the name Regions.EU_WEST_1.name()") + @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" + + "You'll need to use the name Regions.EU_WEST_1.name()") private String region; @UriParam(description = "Amazon Kinesis client to use for all requests for this endpoint") private KinesisClient amazonKinesisClient; @@ -116,7 +116,7 @@ public class Kinesis2Configuration implements Cloneable { public void setShardClosed(Kinesis2ShardClosedStrategyEnum shardClosed) { this.shardClosed = shardClosed; } - + public String getAccessKey() { return accessKey; } @@ -140,14 +140,14 @@ public class Kinesis2Configuration implements Cloneable { public void setRegion(String region) { this.region = region; } - + public Protocol getProxyProtocol() { return proxyProtocol; } public void setProxyProtocol(Protocol proxyProtocol) { this.proxyProtocol = proxyProtocol; - } + } public String getProxyHost() { return proxyHost; @@ -163,8 +163,8 @@ public class Kinesis2Configuration implements Cloneable { public void setProxyPort(Integer proxyPort) { this.proxyPort = proxyPort; - } - + } + // ************************************************* // // ************************************************* diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java index b09aac9..d6e4f56 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java @@ -23,7 +23,8 @@ public interface Kinesis2Constants { String PARTITION_KEY = "CamelAwsKinesisPartitionKey"; /** - * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored + * in a Kinesis Record object, the shard ID is used on writes to indicate + * where the data was stored */ String SHARD_ID = "CamelAwsKinesisShardId"; } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index e080083..0ad92fa 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -67,16 +67,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { currentShardIterator = result.nextShardIterator(); if (isShardClosed) { switch (getEndpoint().getConfiguration().getShardClosed()) { - case ignore: - LOG.warn("The shard {} is in closed state", currentShardIterator); - break; - case silent: - break; - case fail: - LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId()); - throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId()); - default: - throw new IllegalArgumentException("Unsupported shard closed strategy"); + case ignore: + LOG.warn("The shard {} is in closed state", currentShardIterator); + break; + case silent: + break; + case fail: + LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId()); + throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId()); + default: + throw new IllegalArgumentException("Unsupported shard closed strategy"); } } @@ -136,7 +136,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) - .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); + .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); if (hasSequenceNumber()) { req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); @@ -159,7 +159,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { private boolean hasSequenceNumber() { return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty() - && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); + && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) + || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); } } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java index 187efbd..e9cc6e3 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java @@ -46,9 +46,9 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { @UriParam private Kinesis2Configuration configuration; - + private KinesisClient kinesisClient; - + public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) { super(uri, component); this.configuration = configuration; @@ -57,16 +57,14 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { @Override protected void doStart() throws Exception { super.doStart(); - kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient() - : createKinesisClient(); - - + kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient() : createKinesisClient(); + if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) { throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); } } - + @Override public void doStop() throws Exception { if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) { @@ -106,7 +104,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { public Kinesis2Configuration getConfiguration() { return configuration; } - + KinesisClient createKinesisClient() { KinesisClient client = null; KinesisClientBuilder clientBuilder = KinesisClient.builder(); diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index 56a3d96..5c10228 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -34,7 +34,7 @@ public class Kinesis2Producer extends DefaultProducer { @Override public Kinesis2Endpoint getEndpoint() { - return (Kinesis2Endpoint) super.getEndpoint(); + return (Kinesis2Endpoint)super.getEndpoint(); } @Override @@ -60,7 +60,7 @@ public class Kinesis2Producer extends DefaultProducer { } return putRecordRequest.build(); } - + public static Message getMessageForResponse(final Exchange exchange) { return exchange.getMessage(); } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java index 166334e..c98a7e0 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java @@ -18,7 +18,5 @@ package org.apache.camel.component.aws2.kinesis; public enum Kinesis2ShardClosedStrategyEnum { - ignore, - fail, - silent + ignore, fail, silent } diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java index 7d4b2df..80fa545 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java @@ -25,51 +25,53 @@ import software.amazon.awssdk.core.Protocol; import software.amazon.awssdk.regions.Region; public class KinesisFirehoseComponentConfigurationTest extends CamelTestSupport { - + @Test public void createEndpointWithAccessAndSecretKey() throws Exception { KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class); KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxx&secretKey=yyyyy"); - + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey()); - assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); } - + @Test public void createEndpointWithComponentElements() throws Exception { KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class); component.setAccessKey("XXX"); component.setSecretKey("YYY"); KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name"); - + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("XXX", endpoint.getConfiguration().getAccessKey()); assertEquals("YYY", endpoint.getConfiguration().getSecretKey()); } - + @Test public void createEndpointWithComponentAndEndpointElements() throws Exception { KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class); component.setAccessKey("XXX"); component.setSecretKey("YYY"); component.setRegion(Region.US_WEST_1.toString()); - KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); - + KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component + .createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); } - + @Test public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception { KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class); component.setAccessKey("XXX"); component.setSecretKey("YYY"); component.setRegion(Region.US_WEST_1.toString()); - KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP"); - + KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component + .createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP"); + assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java index c3afb12..d3c6477 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java @@ -49,20 +49,18 @@ public class KinesisFirehoseEndpointTest { @Test public void allEndpointParams() throws Exception { - KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name" - + "?amazonKinesisFirehoseClient=#firehoseClient" - ); + KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext + .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?amazonKinesisFirehoseClient=#firehoseClient"); endpoint.start(); assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); } - + @Test public void allClientCreationParams() throws Exception { - KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name" - + "?accessKey=xxx&secretKey=yyy®ion=us-east-1" - ); + KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext + .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?accessKey=xxx&secretKey=yyy®ion=us-east-1"); assertThat(endpoint.getConfiguration().getRegion(), is(Region.US_EAST_1.id())); assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx")); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java index 1240902..3bac64a9 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java @@ -33,7 +33,7 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport { @BindToRegistry("FirehoseClient") FirehoseClient client = FirehoseClient.builder().build(); - + @Test public void testFirehoseRouting() throws Exception { Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() { @@ -49,10 +49,8 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start") - .to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient"); + from("direct:start").to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient"); } }; } } - diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java index 25dc454..b6e47c4 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java @@ -25,29 +25,29 @@ import software.amazon.awssdk.core.Protocol; import software.amazon.awssdk.regions.Region; public class KinesisComponentConfigurationTest extends CamelTestSupport { - + @Test public void createEndpointWithAccessAndSecretKey() throws Exception { Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class); Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxx&secretKey=yyyyy"); - + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey()); - assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); } - + @Test public void createEndpointWithComponentElements() throws Exception { Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class); component.setAccessKey("XXX"); component.setSecretKey("YYY"); Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name"); - + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("XXX", endpoint.getConfiguration().getAccessKey()); assertEquals("YYY", endpoint.getConfiguration().getSecretKey()); } - + @Test public void createEndpointWithComponentAndEndpointElements() throws Exception { Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class); @@ -55,21 +55,22 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport { component.setSecretKey("YYY"); component.setRegion(Region.US_WEST_1.toString()); Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); - + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); } - + @Test public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception { Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class); component.setAccessKey("XXX"); component.setSecretKey("YYY"); component.setRegion(Region.US_WEST_1.toString()); - Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP"); - + Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component + .createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP"); + assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java index 38e6450..c713e91 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java @@ -75,7 +75,6 @@ public class KinesisConsumerClosedShardWithFailTest { Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component); endpoint.start(); undertest = new Kinesis2Consumer(endpoint, processor); - SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build(); Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build(); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index bccdd37..24c75d6 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -81,27 +81,16 @@ public class KinesisConsumerClosedShardWithSilentTest { Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component); endpoint.start(); undertest = new Kinesis2Consumer(endpoint, processor); - + SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build(); Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build(); ArrayList<Shard> shardList = new ArrayList<>(); shardList.add(shard); - - when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(GetRecordsResponse.builder() - .nextShardIterator("nextShardIterator").build() - ); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build()); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) - .thenReturn(DescribeStreamResponse.builder() - .streamDescription(StreamDescription.builder() - .shards(shardList).build() - ).build() - ); - when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) - .thenReturn(GetShardIteratorResponse.builder() - .shardIterator("shardIterator").build() - ); + .thenReturn(DescribeStreamResponse.builder().streamDescription(StreamDescription.builder().shards(shardList).build()).build()); + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build()); } @Test @@ -181,11 +170,8 @@ public class KinesisConsumerClosedShardWithSilentTest { @Test public void recordsAreSentToTheProcessor() throws Exception { - when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(GetRecordsResponse.builder() - .nextShardIterator("nextShardIterator") - .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build() - ); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator") + .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build()); int messageCount = undertest.poll(); @@ -201,15 +187,8 @@ public class KinesisConsumerClosedShardWithSilentTest { public void exchangePropertiesAreSet() throws Exception { String partitionKey = "partitionKey"; String sequenceNumber = "1"; - when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(GetRecordsResponse.builder() - .nextShardIterator("nextShardIterator") - .records(Record.builder() - .sequenceNumber(sequenceNumber) - .approximateArrivalTimestamp(Instant.now()) - .partitionKey(partitionKey).build() - ).build() - ); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator") + .records(Record.builder().sequenceNumber(sequenceNumber).approximateArrivalTimestamp(Instant.now()).partitionKey(partitionKey).build()).build()); undertest.poll(); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java index 413c107..3b58993 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java @@ -49,13 +49,8 @@ public class KinesisEndpointTest { @Test public void allTheEndpointParams() throws Exception { - Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name" - + "?amazonKinesisClient=#kinesisClient" - + "&maxResultsPerRequest=101" - + "&iteratorType=latest" - + "&shardId=abc" - + "&sequenceNumber=123" - ); + Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + + "&maxResultsPerRequest=101" + "&iteratorType=latest" + "&shardId=abc" + "&sequenceNumber=123"); assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); @@ -67,9 +62,7 @@ public class KinesisEndpointTest { @Test public void onlyRequiredEndpointParams() throws Exception { - Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name" - + "?amazonKinesisClient=#kinesisClient" - ); + Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient"); assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); @@ -79,12 +72,8 @@ public class KinesisEndpointTest { @Test public void afterSequenceNumberRequiresSequenceNumber() throws Exception { - Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name" - + "?amazonKinesisClient=#kinesisClient" - + "&iteratorType=AFTER_SEQUENCE_NUMBER" - + "&shardId=abc" - + "&sequenceNumber=123" - ); + Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + + "&iteratorType=AFTER_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123"); assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); @@ -95,12 +84,8 @@ public class KinesisEndpointTest { @Test public void atSequenceNumberRequiresSequenceNumber() throws Exception { - Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name" - + "?amazonKinesisClient=#kinesisClient" - + "&iteratorType=AT_SEQUENCE_NUMBER" - + "&shardId=abc" - + "&sequenceNumber=123" - ); + Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext + .getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&iteratorType=AT_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123"); assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java index e368f87..6515b9d 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java @@ -32,9 +32,7 @@ public class RecordStringConverterTest { @Test public void convertRecordToString() throws Exception { - Record record = Record.builder() - .sequenceNumber("1") - .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build(); + Record record = Record.builder().sequenceNumber("1").data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build(); String result = RecordStringConverter.toString(record); assertThat(result, is("this is a String"));
