[ 
https://issues.apache.org/jira/browse/BEAM-3373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316407#comment-16316407
 ] 

ASF GitHub Bot commented on BEAM-3373:
--------------------------------------

jbonofre closed pull request #4358: [BEAM-3373] Add serviceEndpoint parameter 
to KinesisIO
URL: https://github.com/apache/beam/pull/4358
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 169694110c2..6a93e1babc4 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -22,6 +22,7 @@
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
 import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
@@ -197,12 +198,26 @@ public Read withAWSClientsProvider(AWSClientsProvider 
awsClientsProvider) {
     }
 
     /**
-     * Specify credential details and region to be used to read from Kinesis.
-     * If you need more sophisticated credential protocol, then you should 
look at
-     * {@link Read#withAWSClientsProvider(AWSClientsProvider)}.
+     * Specify credential details and region to be used to read from Kinesis. 
If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
      */
     public Read withAWSClientsProvider(String awsAccessKey, String 
awsSecretKey, Regions region) {
-      return withAWSClientsProvider(new BasicKinesisProvider(awsAccessKey, 
awsSecretKey, region));
+      return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+    }
+
+    /**
+     * Specify credential details and region to be used to read from Kinesis. 
If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Read#withAWSClientsProvider(AWSClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with a kinesis service emulator.
+     */
+    public Read withAWSClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region, String 
serviceEndpoint) {
+      return withAWSClientsProvider(
+          new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, 
serviceEndpoint));
     }
 
     /** Specifies to read at most a given number of records. */
@@ -249,42 +264,50 @@ public Read withUpToDateThreshold(Duration 
upToDateThreshold) {
     }
 
     private static final class BasicKinesisProvider implements 
AWSClientsProvider {
-
       private final String accessKey;
       private final String secretKey;
       private final Regions region;
+      @Nullable private final String serviceEndpoint;
 
-      private BasicKinesisProvider(String accessKey, String secretKey, Regions 
region) {
+      private BasicKinesisProvider(
+          String accessKey, String secretKey, Regions region, @Nullable String 
serviceEndpoint) {
         checkArgument(accessKey != null, "accessKey can not be null");
         checkArgument(secretKey != null, "secretKey can not be null");
         checkArgument(region != null, "region can not be null");
         this.accessKey = accessKey;
         this.secretKey = secretKey;
         this.region = region;
+        this.serviceEndpoint = serviceEndpoint;
       }
 
       private AWSCredentialsProvider getCredentialsProvider() {
-        return new AWSStaticCredentialsProvider(new BasicAWSCredentials(
-            accessKey,
-            secretKey
-        ));
-
+        return new AWSStaticCredentialsProvider(new 
BasicAWSCredentials(accessKey, secretKey));
       }
 
       @Override
       public AmazonKinesis getKinesisClient() {
-        return AmazonKinesisClientBuilder.standard()
-            .withCredentials(getCredentialsProvider())
-            .withRegion(region)
-            .build();
+        AmazonKinesisClientBuilder clientBuilder =
+            
AmazonKinesisClientBuilder.standard().withCredentials(getCredentialsProvider());
+        if (serviceEndpoint == null) {
+          clientBuilder.withRegion(region);
+        } else {
+          clientBuilder.withEndpointConfiguration(
+              new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, 
region.getName()));
+        }
+        return clientBuilder.build();
       }
 
       @Override
       public AmazonCloudWatch getCloudWatchClient() {
-        return AmazonCloudWatchClientBuilder.standard()
-            .withCredentials(getCredentialsProvider())
-            .withRegion(region)
-            .build();
+        AmazonCloudWatchClientBuilder clientBuilder =
+            
AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider());
+        if (serviceEndpoint == null) {
+          clientBuilder.withRegion(region);
+        } else {
+          clientBuilder.withEndpointConfiguration(
+              new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, 
region.getName()));
+        }
+        return clientBuilder.build();
       }
     }
   }
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 665b8972184..754acd13323 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -116,7 +116,7 @@ public boolean start() throws IOException {
 
   /**
    * Moves to the next record in one of the shards.
-   * If current shard iterator can be move forward (i.e. there's a record 
present) then we do it.
+   * If current shard iterator can be moved forward (i.e. there's a record 
present) then we do it.
    * If not, we iterate over shards in a round-robin manner.
    */
   @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add serviceEndpoint parameter to KinesisIO
> ------------------------------------------
>
>                 Key: BEAM-3373
>                 URL: https://issues.apache.org/jira/browse/BEAM-3373
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: Trivial
>             Fix For: 2.3.0
>
>
> KinesisClient can be instantiated with a different serviceEndpoint to the 
> official Amazon one. This allows users to test KinesisIO locally by 
> overwriting the endpointUrl and pointing to an emulator like 
> https://github.com/localstack/localstack or 
> https://github.com/mhart/kinesalite



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to