piotr-szuberski commented on a change in pull request #12422:
URL: https://github.com/apache/beam/pull/12422#discussion_r466621223
##########
File path:
sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
##########
@@ -95,32 +123,114 @@ private void runRead() {
PCollection<KinesisRecord> output =
pipelineRead.apply(
KinesisIO.read()
- .withStreamName(options.getAwsKinesisStream())
+ .withStreamName(streamName)
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
- Regions.fromName(options.getAwsKinesisRegion()))
- .withMaxNumRecords(numberOfRows)
+ Regions.fromName(options.getAwsKinesisRegion()),
+ options.getAwsServiceEndpoint(),
+ options.getAwsVerifyCertificate())
+ .withMaxNumRecords(options.getNumberOfRecords())
// to prevent endless running in case of error
- .withMaxReadTime(Duration.standardMinutes(10))
-
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
- .withInitialTimestampInStream(now)
+ .withMaxReadTime(Duration.standardMinutes(10L))
+
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
.withRequestRecordsLimit(1000));
PAssert.thatSingleton(output.apply("Count All", Count.globally()))
- .isEqualTo((long) numberOfRows);
+ .isEqualTo((long) options.getNumberOfRecords());
PCollection<String> consolidatedHashcode =
output
.apply(ParDo.of(new ExtractDataValues()))
.apply("Hash row contents", Combine.globally(new
HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode)
- .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
pipelineRead.run().waitUntilFinish();
}
+ /** Necessary setup for localstack environment. */
+ private static void setupLocalstack() {
+
System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
"true");
+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true");
+
+ localstackContainer =
+ new LocalStackContainer(LOCALSTACK_VERSION)
+ .withServices(LocalStackContainer.Service.KINESIS)
+ .withEnv("USE_SSL", "true")
+ .withStartupAttempts(3);
+ localstackContainer.start();
+
+ options.setAwsServiceEndpoint(
+ localstackContainer
+ .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+ .getServiceEndpoint()
+ .replace("http", "https"));
+ options.setAwsKinesisRegion(
+ localstackContainer
+ .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
+ .getSigningRegion());
+ options.setAwsAccessKey(
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
+ options.setAwsSecretKey(
+
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
+ options.setNumberOfRecords(1000);
+ options.setNumberOfShards(1);
+ options.setAwsKinesisStream("beam_kinesis_test");
+ options.setAwsVerifyCertificate(false);
+ }
+
+ private static AmazonKinesis createKinesisClient() {
+ AmazonKinesisClientBuilder clientBuilder =
AmazonKinesisClientBuilder.standard();
+
+ AWSCredentialsProvider credentialsProvider =
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(options.getAwsAccessKey(),
options.getAwsSecretKey()));
+ clientBuilder.setCredentials(credentialsProvider);
+
+ if (options.getAwsServiceEndpoint() != null) {
+ AwsClientBuilder.EndpointConfiguration endpointConfiguration =
+ new AwsClientBuilder.EndpointConfiguration(
+ options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
+ clientBuilder.setEndpointConfiguration(endpointConfiguration);
+ } else {
+ clientBuilder.setRegion(options.getAwsKinesisRegion());
+ }
+
+ return clientBuilder.build();
+ }
+
+ private static void createStream() throws Exception {
+ kinesisClient.createStream(streamName, 1);
+ int repeats = 10;
+ for (int i = 0; i <= repeats; ++i) {
+ String streamStatus =
+
kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
+ if ("ACTIVE".equals(streamStatus)) {
+ break;
+ }
+ if (i == repeats) {
+ throw new RuntimeException("Unable to initialize stream");
+ }
+ Thread.sleep(1000L);
+ }
+ }
+
+ /** Check whether pipeline options were provided. If not, use localstack
container. */
+ private static boolean doUseLocalstack() {
Review comment:
Yeah, definitely.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]