[
https://issues.apache.org/jira/browse/BEAM-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315295#comment-16315295
]
ASF GitHub Bot commented on BEAM-3404:
--------------------------------------
iemejia closed pull request #4338: [BEAM-3404] Update KinesisIO to use AWS SDK
1.11.255 and KCL 1.8.8
URL: https://github.com/apache/beam/pull/4338
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.gitignore b/.gitignore
index 29a34a2088e..f2f8aaa3663 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@
.gogradle/
build/
vendor/
+.gradletasknamecache
# Ignore files generated by the Maven build process.
bin/
diff --git a/build.gradle b/build.gradle
index 80b46f9c807..98ce50145a0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -120,6 +120,7 @@ ext.library = [
jackson_annotations:
"com.fasterxml.jackson.core:jackson-annotations:$jackson_version",
jackson_core: "com.fasterxml.jackson.core:jackson-core:$jackson_version",
jackson_databind:
"com.fasterxml.jackson.core:jackson-databind:$jackson_version",
+ jackson_dataformat_cbor:
"com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:$jackson_version",
jackson_dataformat_yaml:
"com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version",
jackson_module_scala:
"com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
joda_time: "joda-time:joda-time:2.4",
@@ -190,6 +191,7 @@ rat {
"**/.gogradle/**",
"**/build/**",
"**/vendor/**",
+ "**/.gradletasknamecache",
// .gitignore: Ignore files generated by the Maven build process
"**/target/**/*",
diff --git a/sdks/java/io/kinesis/build.gradle
b/sdks/java/io/kinesis/build.gradle
index dbf3f9556b0..08ecafc9983 100644
--- a/sdks/java/io/kinesis/build.gradle
+++ b/sdks/java/io/kinesis/build.gradle
@@ -26,7 +26,7 @@ test {
forkEvery 1
}
-def aws_version = "1.11.18"
+def aws_version = "1.11.255"
dependencies {
compile library.java.guava
@@ -34,16 +34,18 @@ dependencies {
shadow library.java.slf4j_api
shadow library.java.joda_time
shadow library.java.findbugs_jsr305
+ shadow library.java.jackson_dataformat_cbor
shadow "com.amazonaws:aws-java-sdk-core:$aws_version"
shadow "com.amazonaws:aws-java-sdk-kinesis:$aws_version"
shadow "com.amazonaws:aws-java-sdk-cloudwatch:$aws_version"
- shadow "com.amazonaws:amazon-kinesis-client:1.6.1"
+ shadow "com.amazonaws:amazon-kinesis-client:1.8.8"
shadow "commons-lang:commons-lang:2.6"
testCompile project(path: ":beam-runners-parent:beam-runners-direct-java",
configuration: "shadow")
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.guava_testlib
testCompile library.java.hamcrest_core
+ testCompile library.java.slf4j_jdk14
testCompile "org.assertj:assertj-core:2.5.0"
}
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index 45a20645506..f1499741b1b 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -65,7 +65,8 @@
</build>
<properties>
- <aws.version>1.11.18</aws.version>
+ <aws.version>1.11.255</aws.version>
+ <amazon-kinesis-client.version>1.8.8</amazon-kinesis-client.version>
</properties>
<dependencies>
@@ -89,7 +90,14 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
- <version>1.6.1</version>
+ <version>${amazon-kinesis-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-cbor</artifactId>
+ <version>${jackson.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
@@ -166,5 +174,11 @@
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 96f7a04be8d..169694110c2 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -20,18 +20,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
-import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.auto.value.AutoValue;
-
import javax.annotation.Nullable;
-
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.transforms.PTransform;
@@ -266,7 +264,7 @@ private BasicKinesisProvider(String accessKey, String
secretKey, Regions region)
}
private AWSCredentialsProvider getCredentialsProvider() {
- return new StaticCredentialsProvider(new BasicAWSCredentials(
+ return new AWSStaticCredentialsProvider(new BasicAWSCredentials(
accessKey,
secretKey
));
@@ -275,16 +273,18 @@ private AWSCredentialsProvider getCredentialsProvider() {
@Override
public AmazonKinesis getKinesisClient() {
- AmazonKinesisClient client = new
AmazonKinesisClient(getCredentialsProvider());
- client.withRegion(region);
- return client;
+ return AmazonKinesisClientBuilder.standard()
+ .withCredentials(getCredentialsProvider())
+ .withRegion(region)
+ .build();
}
@Override
public AmazonCloudWatch getCloudWatchClient() {
- AmazonCloudWatchClient client = new
AmazonCloudWatchClient(getCredentialsProvider());
- client.withRegion(region);
- return client;
+ return AmazonCloudWatchClientBuilder.standard()
+ .withCredentials(getCredentialsProvider())
+ .withRegion(region)
+ .build();
}
}
}
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index d6e88170050..409ec2f4375 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -36,8 +36,12 @@
import
com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeLimitsRequest;
+import com.amazonaws.services.kinesis.model.DescribeLimitsResult;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamSummaryResult;
import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
@@ -65,14 +69,19 @@
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
+import com.amazonaws.services.kinesis.model.StartStreamEncryptionRequest;
+import com.amazonaws.services.kinesis.model.StartStreamEncryptionResult;
+import com.amazonaws.services.kinesis.model.StopStreamEncryptionRequest;
+import com.amazonaws.services.kinesis.model.StopStreamEncryptionResult;
import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
+import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
+import com.amazonaws.services.kinesis.waiters.AmazonKinesisWaiters;
import com.google.common.base.Function;
-
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;
-
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
import org.mockito.Mockito;
@@ -252,6 +261,11 @@ public DeleteStreamResult deleteStream(String streamName) {
throw new RuntimeException("Not implemented");
}
+ @Override
+ public DescribeLimitsResult describeLimits(DescribeLimitsRequest
describeLimitsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
@Override
public DescribeStreamResult describeStream(DescribeStreamRequest
describeStreamRequest) {
throw new RuntimeException("Not implemented");
@@ -269,6 +283,12 @@ public DescribeStreamResult describeStream(String
streamName,
throw new RuntimeException("Not implemented");
}
+ @Override
+ public DescribeStreamSummaryResult describeStreamSummary(
+ DescribeStreamSummaryRequest describeStreamSummaryRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
@Override
public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
@@ -377,6 +397,23 @@ public SplitShardResult splitShard(String streamName,
throw new RuntimeException("Not implemented");
}
+ @Override
+ public StartStreamEncryptionResult startStreamEncryption(
+ StartStreamEncryptionRequest startStreamEncryptionRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public StopStreamEncryptionResult stopStreamEncryption(
+ StopStreamEncryptionRequest stopStreamEncryptionRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public UpdateShardCountResult updateShardCount(UpdateShardCountRequest
updateShardCountRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
@Override
public void shutdown() {
@@ -386,4 +423,9 @@ public void shutdown() {
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest
request) {
throw new RuntimeException("Not implemented");
}
+
+ @Override
+ public AmazonKinesisWaiters waiters() {
+ throw new RuntimeException("Not implemented");
+ }
}
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 7126594c6f5..88f12cb3d13 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -18,21 +18,19 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.regions.Regions;
-
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -43,81 +41,81 @@
import org.apache.commons.lang.RandomStringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
/**
- * Integration test, that reads from the real Kinesis.
- * You need to provide all {@link KinesisTestOptions} in order to run this.
+ * Integration test, that reads from the real Kinesis. You need to provide all
{@link
+ * KinesisTestOptions} in order to run this.
*/
public class KinesisReaderIT {
private static final long PIPELINE_STARTUP_TIME =
TimeUnit.SECONDS.toMillis(10);
+ private static KinesisTestOptions options;
private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
- @Rule
- public final transient TestPipeline p = TestPipeline.create();
+ @Rule public final transient TestPipeline p = TestPipeline.create();
+
+ @BeforeClass
+ public static void setup() {
+ PipelineOptionsFactory.register(KinesisTestOptions.class);
+ options =
TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ }
- @Ignore
@Test
public void readsDataFromRealKinesisStream()
throws IOException, InterruptedException, ExecutionException {
- KinesisTestOptions options = readKinesisOptions();
List<String> testData = prepareTestData(1000);
- Future<?> future = startTestPipeline(testData, options);
+ KinesisIO.Read read =
+ KinesisIO.read()
+ .withStreamName(options.getAwsKinesisStream())
+ .withInitialTimestampInStream(Instant.now())
+ .withAWSClientsProvider(
+ options.getAwsAccessKey(),
+ options.getAwsSecretKey(),
+ Regions.fromName(options.getAwsKinesisRegion()))
+ .withMaxReadTime(Duration.standardMinutes(3));
+
+ Future<?> future = runReadTest(read, testData);
KinesisUploader.uploadAll(testData, options);
future.get();
}
- private List<String> prepareTestData(int count) {
- List<String> data = newArrayList();
+ private static List<String> prepareTestData(int count) {
+ List<String> data = new ArrayList<>();
for (int i = 0; i < count; ++i) {
data.add(RandomStringUtils.randomAlphabetic(32));
}
return data;
}
- private Future<?> startTestPipeline(List<String> testData,
KinesisTestOptions options)
+ private Future<?> runReadTest(KinesisIO.Read read, List<String> testData)
throws InterruptedException {
-
- PCollection<String> result = p.
- apply(KinesisIO.read()
- .withStreamName(options.getAwsKinesisStream())
- .withInitialTimestampInStream(Instant.now())
- .withAWSClientsProvider(options.getAwsAccessKey(),
options.getAwsSecretKey(),
- Regions.fromName(options.getAwsKinesisRegion()))
- .withMaxReadTime(Duration.standardMinutes(3))
- ).
- apply(ParDo.of(new RecordDataToString()));
+ PCollection<String> result = p.apply(read).apply(ParDo.of(new
RecordDataToString()));
PAssert.that(result).containsInAnyOrder(testData);
- Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
-
- @Override
- public Void call() throws Exception {
- PipelineResult result = p.run();
- PipelineResult.State state = result.getState();
- while (state != PipelineResult.State.DONE && state !=
PipelineResult.State.FAILED) {
- Thread.sleep(1000);
- state = result.getState();
- }
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
- return null;
- }
- });
+ Future<?> future =
+ singleThreadExecutor.submit(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ PipelineResult result = p.run();
+ PipelineResult.State state = result.getState();
+ while (state != PipelineResult.State.DONE && state !=
PipelineResult.State.FAILED) {
+ Thread.sleep(1000);
+ state = result.getState();
+ }
+ assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ return null;
+ }
+ });
Thread.sleep(PIPELINE_STARTUP_TIME);
return future;
}
- private KinesisTestOptions readKinesisOptions() {
- PipelineOptionsFactory.register(KinesisTestOptions.class);
- return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
- }
-
private static class RecordDataToString extends DoFn<KinesisRecord, String> {
-
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkNotNull(c.element(), "Null record given");
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7a7cb02202a..8fbafd31b0a 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -17,20 +17,18 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import static com.google.common.collect.Lists.newArrayList;
-
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.internal.StaticCredentialsProvider;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -41,16 +39,17 @@
public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
public static void uploadAll(List<String> data, KinesisTestOptions options) {
- AmazonKinesisClient client = new AmazonKinesisClient(
- new StaticCredentialsProvider(
- new BasicAWSCredentials(
- options.getAwsAccessKey(), options.getAwsSecretKey()))
- ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+ AmazonKinesis client =
+ AmazonKinesisClientBuilder.standard()
+ .withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(options.getAwsAccessKey(),
options.getAwsSecretKey())))
+ .withRegion(options.getAwsKinesisRegion())
+ .build();
List<List<String>> partitions = Lists.partition(data,
MAX_NUMBER_OF_RECORDS_IN_BATCH);
-
for (List<String> partition : partitions) {
- List<PutRecordsRequestEntry> allRecords = newArrayList();
+ List<PutRecordsRequestEntry> allRecords = new ArrayList<>();
for (String row : partition) {
allRecords.add(new PutRecordsRequestEntry().
withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
@@ -65,7 +64,7 @@ public static void uploadAll(List<String> data,
KinesisTestOptions options) {
new PutRecordsRequest().
withStreamName(options.getAwsKinesisStream()).
withRecords(allRecords));
- List<PutRecordsRequestEntry> failedRecords = newArrayList();
+ List<PutRecordsRequestEntry> failedRecords = new ArrayList<>();
int i = 0;
for (PutRecordsResultEntry row : result.getRecords()) {
if (row.getErrorCode() != null) {
@@ -79,5 +78,4 @@ public static void uploadAll(List<String> data,
KinesisTestOptions options) {
while (result.getFailedRecordCount() > 0);
}
}
-
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Update KinesisIO to use AWS SDK 1.11.255 and KCL 1.8.8
> ------------------------------------------------------
>
> Key: BEAM-3404
> URL: https://issues.apache.org/jira/browse/BEAM-3404
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Affects Versions: 2.3.0
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Minor
>
> The current version of the AWS SDK that Kinesis uses does not include new
> regions/AZ from AWS. This update solves this as well as include the most
> recent fixes on the SDK.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)