[
https://issues.apache.org/jira/browse/BEAM-3819?focusedWorklogId=85335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85335
]
ASF GitHub Bot logged work on BEAM-3819:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Mar/18 17:22
Start Date: 28/Mar/18 17:22
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #4851: [BEAM-3819] Add
withRequestRecordsLimit() option to KinesisIO
URL: https://github.com/apache/beam/pull/4851
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 0fe51406502..899a2bc38d2 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -215,6 +215,9 @@ public static Write write() {
abstract Duration getUpToDateThreshold();
+ @Nullable
+ abstract Integer getRequestRecordsLimit();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -232,6 +235,8 @@ public static Write write() {
abstract Builder setUpToDateThreshold(Duration upToDateThreshold);
+ abstract Builder setRequestRecordsLimit(Integer limit);
+
abstract Read build();
}
@@ -319,6 +324,18 @@ public Read withUpToDateThreshold(Duration
upToDateThreshold) {
return toBuilder().setUpToDateThreshold(upToDateThreshold).build();
}
+ /**
+ * Specifies the maximum number of records in GetRecordsResult returned by
GetRecords call which
+ * is limited by 10K records. If should be adjusted according to average
size of data record to
+ * prevent shard overloading. More details can be found here:
+ * <a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html">API_GetRecords</a>
+ */
+ public Read withRequestRecordsLimit(int limit) {
+ checkArgument(limit > 0, "limit must be positive, but was: %s", limit);
+ checkArgument(limit <= 10_000, "limit must be up to 10,000, but was:
%s", limit);
+ return toBuilder().setRequestRecordsLimit(limit).build();
+ }
+
@Override
public PCollection<KinesisRecord> expand(PBegin input) {
Unbounded<KinesisRecord> unbounded =
@@ -327,7 +344,8 @@ public Read withUpToDateThreshold(Duration
upToDateThreshold) {
getAWSClientsProvider(),
getStreamName(),
getInitialPosition(),
- getUpToDateThreshold()));
+ getUpToDateThreshold(),
+ getRequestRecordsLimit()));
PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 88cfd472fc5..b5924e4b46a 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -40,20 +40,22 @@
private final String streamName;
private final Duration upToDateThreshold;
private CheckpointGenerator initialCheckpointGenerator;
+ private final Integer limit;
KinesisSource(AWSClientsProvider awsClientsProvider, String streamName,
- StartingPoint startingPoint, Duration upToDateThreshold) {
+ StartingPoint startingPoint, Duration upToDateThreshold, Integer limit) {
this(awsClientsProvider, new DynamicCheckpointGenerator(streamName,
startingPoint), streamName,
- upToDateThreshold);
+ upToDateThreshold, limit);
}
private KinesisSource(AWSClientsProvider awsClientsProvider,
- CheckpointGenerator initialCheckpoint, String streamName,
- Duration upToDateThreshold) {
+ CheckpointGenerator initialCheckpoint, String streamName, Duration
upToDateThreshold,
+ Integer limit) {
this.awsClientsProvider = awsClientsProvider;
this.initialCheckpointGenerator = initialCheckpoint;
this.streamName = streamName;
this.upToDateThreshold = upToDateThreshold;
+ this.limit = limit;
validate();
}
@@ -65,8 +67,8 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
@Override
public List<KinesisSource> split(int desiredNumSplits,
PipelineOptions options) throws Exception {
- KinesisReaderCheckpoint checkpoint =
-
initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(awsClientsProvider));
+ KinesisReaderCheckpoint checkpoint = initialCheckpointGenerator
+ .generate(SimplifiedKinesisClient.from(awsClientsProvider, limit));
List<KinesisSource> sources = newArrayList();
@@ -75,7 +77,8 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
awsClientsProvider,
new StaticCheckpointGenerator(partition),
streamName,
- upToDateThreshold));
+ upToDateThreshold,
+ limit));
}
return sources;
}
@@ -98,7 +101,7 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
LOG.info("Creating new reader using {}", checkpointGenerator);
return new KinesisReader(
- SimplifiedKinesisClient.from(awsClientsProvider),
+ SimplifiedKinesisClient.from(awsClientsProvider, limit),
checkpointGenerator,
this,
upToDateThreshold);
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 7cd2fd41312..5af0e5531a6 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -58,15 +58,18 @@
private static final String STREAM_NAME_DIMENSION = "StreamName";
private final AmazonKinesis kinesis;
private final AmazonCloudWatch cloudWatch;
+ private final Integer limit;
- public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch
cloudWatch) {
+ public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch
cloudWatch,
+ Integer limit) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch");
+ this.limit = limit;
}
- public static SimplifiedKinesisClient from(AWSClientsProvider provider) {
+ public static SimplifiedKinesisClient from(AWSClientsProvider provider,
Integer limit) {
return new SimplifiedKinesisClient(provider.getKinesisClient(),
- provider.getCloudWatchClient());
+ provider.getCloudWatchClient(), limit);
}
public String getShardIterator(final String streamName, final String shardId,
@@ -113,7 +116,7 @@ public String getShardIterator(final String streamName,
final String shardId,
*/
public GetKinesisRecordsResult getRecords(String shardIterator, String
streamName,
String shardId) throws TransientKinesisException {
- return getRecords(shardIterator, streamName, shardId, null);
+ return getRecords(shardIterator, streamName, shardId, limit);
}
/**
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
index 6b8cbdfbeb2..c439393549c 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
@@ -86,7 +86,8 @@ public void testWriteThenRead() throws Exception {
// to prevent endless running in case of error
.withMaxReadTime(Duration.standardMinutes(5))
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
- .withInitialTimestampInStream(now))
+ .withInitialTimestampInStream(now)
+ .withRequestRecordsLimit(1000))
.apply(
ParDo.of(
new DoFn<KinesisRecord, byte[]>() {
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 43993f410fe..a03ef4ef4c3 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -20,6 +20,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -32,13 +34,19 @@
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.joda.time.Instant;
import org.joda.time.Minutes;
@@ -47,6 +55,7 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
/***
*/
@@ -329,4 +338,29 @@ private AmazonServiceException
newAmazonServiceException(ErrorType errorType) {
exception.setErrorType(errorType);
return exception;
}
+
+ @Test
+ public void shouldReturnLimitedNumberOfRecords() throws Exception {
+ final Integer limit = 100;
+
+ doAnswer((Answer<GetRecordsResult>) invocation -> {
+ GetRecordsRequest request = (GetRecordsRequest)
invocation.getArguments()[0];
+ List<Record> records = generateRecords(request.getLimit());
+ return new
GetRecordsResult().withRecords(records).withMillisBehindLatest(1000L);
+ }).when(kinesis).getRecords(any(GetRecordsRequest.class));
+
+ GetKinesisRecordsResult result = underTest.getRecords(SHARD_ITERATOR,
STREAM, SHARD_1, limit);
+ assertThat(result.getRecords().size()).isEqualTo(limit);
+ }
+
+ private List<Record> generateRecords(int num) {
+ List<Record> records = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ byte[] value = new byte[1024];
+ Arrays.fill(value, (byte) i);
+ records.add(new
Record().withSequenceNumber(String.valueOf(i)).withPartitionKey("key")
+ .withData(ByteBuffer.wrap(value)));
+ }
+ return records;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85335)
Time Spent: 2h 20m (was: 2h 10m)
> Add withRequestRecordsLimit option to KinesisIO
> -----------------------------------------------
>
> Key: BEAM-3819
> URL: https://issues.apache.org/jira/browse/BEAM-3819
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis
> Reporter: Jean-Baptiste Onofré
> Assignee: Alexey Romanenko
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> In some cases, the user might need to set the {{limit}} on the
> {{SimplifiedKinesisClient}}, especially for performance reason, depending of
> the number of records.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)