This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3a5b70f [BEAM-7080] Remove unused class KinesisUploader from KinesisIO
new 1a9faf5 Merge pull request #8308: [BEAM-7080] Remove unused class
KinesisUploader from KinesisIO tests
3a5b70f is described below
commit 3a5b70ffa3b873e4f04ba0bccb22b07fc4e4fec3
Author: Ismaël Mejía <[email protected]>
AuthorDate: Mon Apr 15 15:02:40 2019 +0200
[BEAM-7080] Remove unused class KinesisUploader from KinesisIO
---
.../beam/sdk/io/kinesis/KinesisUploader.java | 77 ----------------------
1 file changed, 77 deletions(-)
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
deleted file mode 100644
index f33815c..0000000
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
-import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
-import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
-
-/** Sends records to Kinesis in reliable way. */
-public class KinesisUploader {
-
- public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
-
- public static void uploadAll(List<String> data, KinesisTestOptions options) {
- AmazonKinesis client =
- AmazonKinesisClientBuilder.standard()
- .withCredentials(
- new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(options.getAwsAccessKey(),
options.getAwsSecretKey())))
- .withRegion(options.getAwsKinesisRegion())
- .build();
-
- List<List<String>> partitions = Lists.partition(data,
MAX_NUMBER_OF_RECORDS_IN_BATCH);
- for (List<String> partition : partitions) {
- List<PutRecordsRequestEntry> allRecords = new ArrayList<>();
- for (String row : partition) {
- allRecords.add(
- new PutRecordsRequestEntry()
-
.withData(ByteBuffer.wrap(row.getBytes(StandardCharsets.UTF_8)))
- .withPartitionKey(Integer.toString(row.hashCode())));
- }
-
- PutRecordsResult result;
- do {
- result =
- client.putRecords(
- new PutRecordsRequest()
- .withStreamName(options.getAwsKinesisStream())
- .withRecords(allRecords));
- List<PutRecordsRequestEntry> failedRecords = new ArrayList<>();
- int i = 0;
- for (PutRecordsResultEntry row : result.getRecords()) {
- if (row.getErrorCode() != null) {
- failedRecords.add(allRecords.get(i));
- }
- ++i;
- }
- allRecords = failedRecords;
- } while (result.getFailedRecordCount() > 0);
- }
- }
-}