[
https://issues.apache.org/jira/browse/BEAM-3373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316407#comment-16316407
]
ASF GitHub Bot commented on BEAM-3373:
--------------------------------------
jbonofre closed pull request #4358: [BEAM-3373] Add serviceEndpoint parameter
to KinesisIO
URL: https://github.com/apache/beam/pull/4358
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 169694110c2..6a93e1babc4 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -22,6 +22,7 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
@@ -197,12 +198,26 @@ public Read withAWSClientsProvider(AWSClientsProvider
awsClientsProvider) {
}
/**
- * Specify credential details and region to be used to read from Kinesis.
- * If you need more sophisticated credential protocol, then you should
look at
- * {@link Read#withAWSClientsProvider(AWSClientsProvider)}.
+ * Specify credential details and region to be used to read from Kinesis.
If you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * Read#withAWSClientsProvider(AWSClientsProvider)}.
*/
public Read withAWSClientsProvider(String awsAccessKey, String
awsSecretKey, Regions region) {
- return withAWSClientsProvider(new BasicKinesisProvider(awsAccessKey,
awsSecretKey, region));
+ return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+ }
+
+ /**
+ * Specify credential details and region to be used to read from Kinesis.
If you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * Read#withAWSClientsProvider(AWSClientsProvider)}.
+ *
+ * <p>The {@code serviceEndpoint} sets an alternative service host. This
is useful to execute
+ * the tests with a kinesis service emulator.
+ */
+ public Read withAWSClientsProvider(
+ String awsAccessKey, String awsSecretKey, Regions region, String
serviceEndpoint) {
+ return withAWSClientsProvider(
+ new BasicKinesisProvider(awsAccessKey, awsSecretKey, region,
serviceEndpoint));
}
/** Specifies to read at most a given number of records. */
@@ -249,42 +264,50 @@ public Read withUpToDateThreshold(Duration
upToDateThreshold) {
}
private static final class BasicKinesisProvider implements
AWSClientsProvider {
-
private final String accessKey;
private final String secretKey;
private final Regions region;
+ @Nullable private final String serviceEndpoint;
- private BasicKinesisProvider(String accessKey, String secretKey, Regions
region) {
+ private BasicKinesisProvider(
+ String accessKey, String secretKey, Regions region, @Nullable String
serviceEndpoint) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region;
+ this.serviceEndpoint = serviceEndpoint;
}
private AWSCredentialsProvider getCredentialsProvider() {
- return new AWSStaticCredentialsProvider(new BasicAWSCredentials(
- accessKey,
- secretKey
- ));
-
+ return new AWSStaticCredentialsProvider(new
BasicAWSCredentials(accessKey, secretKey));
}
@Override
public AmazonKinesis getKinesisClient() {
- return AmazonKinesisClientBuilder.standard()
- .withCredentials(getCredentialsProvider())
- .withRegion(region)
- .build();
+ AmazonKinesisClientBuilder clientBuilder =
+
AmazonKinesisClientBuilder.standard().withCredentials(getCredentialsProvider());
+ if (serviceEndpoint == null) {
+ clientBuilder.withRegion(region);
+ } else {
+ clientBuilder.withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(serviceEndpoint,
region.getName()));
+ }
+ return clientBuilder.build();
}
@Override
public AmazonCloudWatch getCloudWatchClient() {
- return AmazonCloudWatchClientBuilder.standard()
- .withCredentials(getCredentialsProvider())
- .withRegion(region)
- .build();
+ AmazonCloudWatchClientBuilder clientBuilder =
+
AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider());
+ if (serviceEndpoint == null) {
+ clientBuilder.withRegion(region);
+ } else {
+ clientBuilder.withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(serviceEndpoint,
region.getName()));
+ }
+ return clientBuilder.build();
}
}
}
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 665b8972184..754acd13323 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -116,7 +116,7 @@ public boolean start() throws IOException {
/**
* Moves to the next record in one of the shards.
- * If current shard iterator can be move forward (i.e. there's a record
present) then we do it.
+ * If current shard iterator can be moved forward (i.e. there's a record
present) then we do it.
* If not, we iterate over shards in a round-robin manner.
*/
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add serviceEndpoint parameter to KinesisIO
> ------------------------------------------
>
> Key: BEAM-3373
> URL: https://issues.apache.org/jira/browse/BEAM-3373
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Trivial
> Fix For: 2.3.0
>
>
> KinesisClient can be instantiated with a different serviceEndpoint to the
> official Amazon one. This allows users to test KinesisIO locally by
> overwriting the endpointUrl and pointing to an emulator like
> https://github.com/localstack/localstack or
> https://github.com/mhart/kinesalite
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)