[ 
https://issues.apache.org/jira/browse/BEAM-3819?focusedWorklogId=85335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85335
 ]

ASF GitHub Bot logged work on BEAM-3819:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/18 17:22
            Start Date: 28/Mar/18 17:22
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #4851: [BEAM-3819] Add 
withRequestRecordsLimit() option to KinesisIO
URL: https://github.com/apache/beam/pull/4851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 0fe51406502..899a2bc38d2 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -215,6 +215,9 @@ public static Write write() {
 
     abstract Duration getUpToDateThreshold();
 
+    @Nullable
+    abstract Integer getRequestRecordsLimit();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -232,6 +235,8 @@ public static Write write() {
 
       abstract Builder setUpToDateThreshold(Duration upToDateThreshold);
 
+      abstract Builder setRequestRecordsLimit(Integer limit);
+
       abstract Read build();
     }
 
@@ -319,6 +324,18 @@ public Read withUpToDateThreshold(Duration 
upToDateThreshold) {
       return toBuilder().setUpToDateThreshold(upToDateThreshold).build();
     }
 
+    /**
+     * Specifies the maximum number of records in GetRecordsResult returned by 
GetRecords call which
+     * is limited by 10K records. If should be adjusted according to average 
size of data record to
+     * prevent shard overloading. More details can be found here:
+     * <a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html";>API_GetRecords</a>
+     */
+    public Read withRequestRecordsLimit(int limit) {
+      checkArgument(limit > 0, "limit must be positive, but was: %s", limit);
+      checkArgument(limit <= 10_000, "limit must be up to 10,000, but was: 
%s", limit);
+      return toBuilder().setRequestRecordsLimit(limit).build();
+    }
+
     @Override
     public PCollection<KinesisRecord> expand(PBegin input) {
       Unbounded<KinesisRecord> unbounded =
@@ -327,7 +344,8 @@ public Read withUpToDateThreshold(Duration 
upToDateThreshold) {
                   getAWSClientsProvider(),
                   getStreamName(),
                   getInitialPosition(),
-                  getUpToDateThreshold()));
+                  getUpToDateThreshold(),
+                  getRequestRecordsLimit()));
 
       PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
 
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 88cfd472fc5..b5924e4b46a 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -40,20 +40,22 @@
   private final String streamName;
   private final Duration upToDateThreshold;
   private CheckpointGenerator initialCheckpointGenerator;
+  private final Integer limit;
 
   KinesisSource(AWSClientsProvider awsClientsProvider, String streamName,
-      StartingPoint startingPoint, Duration upToDateThreshold) {
+      StartingPoint startingPoint, Duration upToDateThreshold, Integer limit) {
     this(awsClientsProvider, new DynamicCheckpointGenerator(streamName, 
startingPoint), streamName,
-        upToDateThreshold);
+        upToDateThreshold, limit);
   }
 
   private KinesisSource(AWSClientsProvider awsClientsProvider,
-      CheckpointGenerator initialCheckpoint, String streamName,
-      Duration upToDateThreshold) {
+      CheckpointGenerator initialCheckpoint, String streamName, Duration 
upToDateThreshold,
+      Integer limit) {
     this.awsClientsProvider = awsClientsProvider;
     this.initialCheckpointGenerator = initialCheckpoint;
     this.streamName = streamName;
     this.upToDateThreshold = upToDateThreshold;
+    this.limit = limit;
     validate();
   }
 
@@ -65,8 +67,8 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
   @Override
   public List<KinesisSource> split(int desiredNumSplits,
       PipelineOptions options) throws Exception {
-    KinesisReaderCheckpoint checkpoint =
-        
initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(awsClientsProvider));
+    KinesisReaderCheckpoint checkpoint = initialCheckpointGenerator
+        .generate(SimplifiedKinesisClient.from(awsClientsProvider, limit));
 
     List<KinesisSource> sources = newArrayList();
 
@@ -75,7 +77,8 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
           awsClientsProvider,
           new StaticCheckpointGenerator(partition),
           streamName,
-          upToDateThreshold));
+          upToDateThreshold,
+          limit));
     }
     return sources;
   }
@@ -98,7 +101,7 @@ private KinesisSource(AWSClientsProvider awsClientsProvider,
     LOG.info("Creating new reader using {}", checkpointGenerator);
 
     return new KinesisReader(
-        SimplifiedKinesisClient.from(awsClientsProvider),
+        SimplifiedKinesisClient.from(awsClientsProvider, limit),
         checkpointGenerator,
         this,
         upToDateThreshold);
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 7cd2fd41312..5af0e5531a6 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -58,15 +58,18 @@
   private static final String STREAM_NAME_DIMENSION = "StreamName";
   private final AmazonKinesis kinesis;
   private final AmazonCloudWatch cloudWatch;
+  private final Integer limit;
 
-  public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch 
cloudWatch) {
+  public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch 
cloudWatch,
+      Integer limit) {
     this.kinesis = checkNotNull(kinesis, "kinesis");
     this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch");
+    this.limit = limit;
   }
 
-  public static SimplifiedKinesisClient from(AWSClientsProvider provider) {
+  public static SimplifiedKinesisClient from(AWSClientsProvider provider, 
Integer limit) {
     return new SimplifiedKinesisClient(provider.getKinesisClient(),
-        provider.getCloudWatchClient());
+        provider.getCloudWatchClient(), limit);
   }
 
   public String getShardIterator(final String streamName, final String shardId,
@@ -113,7 +116,7 @@ public String getShardIterator(final String streamName, 
final String shardId,
    */
   public GetKinesisRecordsResult getRecords(String shardIterator, String 
streamName,
       String shardId) throws TransientKinesisException {
-    return getRecords(shardIterator, streamName, shardId, null);
+    return getRecords(shardIterator, streamName, shardId, limit);
   }
 
   /**
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
index 6b8cbdfbeb2..c439393549c 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
@@ -86,7 +86,8 @@ public void testWriteThenRead() throws Exception {
                     // to prevent endless running in case of error
                     .withMaxReadTime(Duration.standardMinutes(5))
                     
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-                    .withInitialTimestampInStream(now))
+                    .withInitialTimestampInStream(now)
+                    .withRequestRecordsLimit(1000))
             .apply(
                 ParDo.of(
                     new DoFn<KinesisRecord, byte[]>() {
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 43993f410fe..a03ef4ef4c3 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -20,6 +20,8 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
@@ -32,13 +34,19 @@
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.joda.time.Instant;
 import org.joda.time.Minutes;
@@ -47,6 +55,7 @@
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
 /***
  */
@@ -329,4 +338,29 @@ private AmazonServiceException 
newAmazonServiceException(ErrorType errorType) {
     exception.setErrorType(errorType);
     return exception;
   }
+
+  @Test
+  public void shouldReturnLimitedNumberOfRecords() throws Exception {
+    final Integer limit = 100;
+
+    doAnswer((Answer<GetRecordsResult>) invocation -> {
+      GetRecordsRequest request = (GetRecordsRequest) 
invocation.getArguments()[0];
+      List<Record> records = generateRecords(request.getLimit());
+      return new 
GetRecordsResult().withRecords(records).withMillisBehindLatest(1000L);
+    }).when(kinesis).getRecords(any(GetRecordsRequest.class));
+
+    GetKinesisRecordsResult result = underTest.getRecords(SHARD_ITERATOR, 
STREAM, SHARD_1, limit);
+    assertThat(result.getRecords().size()).isEqualTo(limit);
+  }
+
+  private List<Record> generateRecords(int num) {
+    List<Record> records = new ArrayList<>();
+    for (int i = 0; i < num; i++) {
+      byte[] value = new byte[1024];
+      Arrays.fill(value, (byte) i);
+      records.add(new 
Record().withSequenceNumber(String.valueOf(i)).withPartitionKey("key")
+          .withData(ByteBuffer.wrap(value)));
+    }
+    return records;
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85335)
    Time Spent: 2h 20m  (was: 2h 10m)

> Add withRequestRecordsLimit option to KinesisIO
> -----------------------------------------------
>
>                 Key: BEAM-3819
>                 URL: https://issues.apache.org/jira/browse/BEAM-3819
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> In some cases, the user might need to set the {{limit}} on the 
> {{SimplifiedKinesisClient}}, especially for performance reason, depending of 
> the number of records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to