[ 
https://issues.apache.org/jira/browse/BEAM-7589?focusedWorklogId=269241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-269241
 ]

ASF GitHub Bot logged work on BEAM-7589:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jun/19 13:02
            Start Date: 28/Jun/19 13:02
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #8955: [BEAM-7589] 
Use only one KinesisProducer instance per JVM 
URL: https://github.com/apache/beam/pull/8955#discussion_r298576007
 
 

 ##########
 File path: 
sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
 ##########
 @@ -657,67 +661,72 @@ public void processElement(ProcessContext c) throws 
Exception {
 
         ListenableFuture<UserRecordResult> f =
             producer.addUserRecord(spec.getStreamName(), partitionKey, 
explicitHashKey, data);
-        Futures.addCallback(f, new UserRecordResultFutureCallback());
+        putFutures.add(f);
       }
 
       @FinishBundle
       public void finishBundle() throws Exception {
-        // Flush all outstanding records, blocking call
-        flushAll();
-
-        checkForFailures();
-      }
-
-      @Teardown
-      public void tearDown() throws Exception {
-        if (producer != null) {
-          producer.destroy();
-          producer = null;
-        }
+        flushBundle();
       }
 
       /**
-       * Flush outstanding records until the total number will be less than 
required or the number
-       * of retries will be exhausted. The retry timeout starts from 1 second 
and it doubles on
-       * every iteration.
+       * Flush outstanding records until the total number of failed records 
will be less than 0 or
+       * the number of retries will be exhausted. The retry timeout starts 
from 1 second and it
+       * doubles on every iteration.
        */
-      private void flush(int numMax) throws InterruptedException, IOException {
+      private void flushBundle() throws InterruptedException, 
ExecutionException, IOException {
         int retries = spec.getRetries();
-        int numOutstandingRecords = producer.getOutstandingRecordsCount();
+        int numFailedRecords;
         int retryTimeout = 1000; // initial timeout, 1 sec
+        String message = "";
 
-        while (numOutstandingRecords > numMax && retries-- > 0) {
+        do {
+          numFailedRecords = 0;
           producer.flush();
+
+          // Wait for puts to finish and check the results
+          for (Future<UserRecordResult> f : putFutures) {
+            UserRecordResult result = f.get(); // this does block
+            if (!result.isSuccessful()) {
+              numFailedRecords++;
+            }
+          }
+
           // wait until outstanding records will be flushed
           Thread.sleep(retryTimeout);
-          numOutstandingRecords = producer.getOutstandingRecordsCount();
           retryTimeout *= 2; // exponential backoff
-        }
+        } while (numFailedRecords > 0 && retries-- > 0);
+
+        if (numFailedRecords > 0) {
+          for (Future<UserRecordResult> f : putFutures) {
+            UserRecordResult result = f.get();
+            if (!result.isSuccessful()) {
+              failures.offer(
+                  new KinesisWriteException(
+                      "Put record was not successful.", new 
UserRecordFailedException(result)));
+            }
+          }
 
-        if (numOutstandingRecords > numMax) {
-          String message =
+          message =
               String.format(
-                  "After [%d] retries, number of outstanding records [%d] is 
still greater than "
-                      + "required [%d].",
-                  spec.getRetries(), numOutstandingRecords, numMax);
+                  "After [%d] retries, number of failed records [%d] is still 
greater than 0",
+                  spec.getRetries(), numFailedRecords);
           LOG.error(message);
-          throw new IOException(message);
         }
-      }
 
-      private void flushAll() throws InterruptedException, IOException {
-        flush(0);
+        checkForFailures(message);
       }
 
       /** If any write has asynchronously failed, fail the bundle with a 
useful error. */
-      private void checkForFailures() throws IOException {
-        // Note that this function is never called by multiple threads and is 
the only place that
-        // we remove from failures, so this code is safe.
+      private void checkForFailures(String message)
+          throws IOException, InterruptedException, ExecutionException {
         if (failures.isEmpty()) {
 
 Review comment:
   I think it will be clearer to move this validation into the invoker method 
and do the `checkForFailures` method just produce the error string and use it 
only to throw the exception, but it is ok if you prefer to let the logic as it 
is.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 269241)
    Time Spent: 0.5h  (was: 20m)

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7589
>                 URL: https://issues.apache.org/jira/browse/BEAM-7589
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Anton Kedin
>            Assignee: Alexey Romanenko
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Follow up from https://issues.apache.org/jira/browse/BEAM-7357:
>  
> ----
> Brachi Packter added a comment - 13/Jun/19 09:05
>  [~aromanenko] I think I find what makes the shard map update now.
> You create a producer per bundle (in SetUp function) and if I multiply it by 
> the number of workers, this gives huge amount of producers, I belive this 
> make the "update shard map" call.
> If I copy your code and create *one* producer ** for every wroker, then this 
> error disappear.
> Can you just remove the producer creation from setUp method, and move it to 
> some static field in the class, that created once the class is initiated.
> See similar issue that was with JDBCIO, connection pool was created per setup 
> method, and we moved it to be a static member, and then we will have one pool 
> for JVM. ask [~iemejia] for more detail.
> ----
> Alexey Romanenko added a comment  -14/Jun/19 14:31-  edited
>   
>  [~brachi_packter] What kind of error do you have in this case? Could you 
> post an error stacktrace / exception message? 
>  Also, it would be helpful (if it's possible) if you could provide more 
> details about your environment and pipeline, like what is your pipeline 
> topology, which runner do you use, number of workers in your cluster, etc. 
>  For now, I can't reproduce it on my side, so all additional info will be 
> helpful.
> ----
> Brachi Packter added a comment - 16/Jun/19 06:44
>  I get same Same error:
> {code:java}
> [0x00001728][0x00007f13ed4c4700] [error] [shard_map.cc:150] Shard map update 
> for stream "**" failed. Code: LimitExceededException Message: Rate exceeded 
> for stream poc-test under account **.; retrying in 5062 ms
> {code}
> I'm not seeing full stack trace, but can see in log also this:
> {code:java}
> [2019-06-13 08:29:09.427018] [0x000007e1][0x00007f8d508d3700] [warning] [AWS 
> Log: WARN](AWSErrorMarshaller)Encountered AWSError Throttling Rate exceeded
> {code}
> More details:
>  I'm using DataFlow runner, java SDK 2.11.
> 60 workers initally, (with auto scalling and also with flag 
> "enableStreamingEngine")
> Normally, I'm producing 4-5k per second, but when I have latency, this can be 
> even multiply by 3-4 times.
> When I'm starting the DataFlow job I have latency, so I produce more data, 
> and I fail immediately.
> Also, I have consumers, 3rd party tool, I know that they call describe stream 
> each 30 seconds.
> My job pipeline, running on GCP, reading data from PubSub, it read around 
> 20,000 record per second (in regular time, and in latency time even 100,000 
> records per second) , it does many aggregation and counting base on some 
> diamnesions (Using Beam sql) , This is done for 1 minutes window slide, and 
> wrting the result of aggregations to Kinesis stream.
> My stream has 10 shards, and my partition key logic is generating UUid per 
> each record: 
> UUID.randomUUID().toString()
> Hope this gave you some more context on my problem.
> Another suggestion I have, can you try fix the issue as I suggest and provide 
> me some specific version for testing? without merging it to master? (I would 
> di it myself, but I had truobles building locally the hue repository of 
> apache beam..)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to