[ 
https://issues.apache.org/jira/browse/BEAM-6262?focusedWorklogId=177490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177490
 ]

ASF GitHub Bot logged work on BEAM-6262:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Dec/18 13:31
            Start Date: 20/Dec/18 13:31
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #7315: [BEAM-6262] 
KinesisIO - gracefully shutdown executor service
URL: https://github.com/apache/beam/pull/7315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index a02caf072529..86fda0a48107 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -44,6 +44,7 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShardReadersPool.class);
   private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
+  private static final int ATTEMPTS_TO_SHUTDOWN = 3;
 
   /**
    * Executor service for running the threads that read records from shards 
handled by this pool.
@@ -172,17 +173,29 @@ private void readLoop(ShardRecordsIterator 
shardRecordsIterator) {
   void stop() {
     LOG.info("Closing shard iterators pool");
     poolOpened.set(false);
-    executorService.shutdownNow();
-    boolean isShutdown = false;
-    int attemptsLeft = 3;
-    while (!isShutdown && attemptsLeft-- > 0) {
+    executorService.shutdown();
+    awaitTermination();
+    if (!executorService.isTerminated()) {
+      LOG.warn(
+          "Executor service was not completely terminated after {} attempts, 
trying to forcibly stop it.",
+          ATTEMPTS_TO_SHUTDOWN);
+      executorService.shutdownNow();
+      awaitTermination();
+    }
+  }
+
+  private void awaitTermination() {
+    int attemptsLeft = ATTEMPTS_TO_SHUTDOWN;
+    boolean isTerminated = executorService.isTerminated();
+
+    while (!isTerminated && attemptsLeft-- > 0) {
       try {
-        isShutdown = executorService.awaitTermination(10, TimeUnit.SECONDS);
+        isTerminated = executorService.awaitTermination(10, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         LOG.error("Interrupted while waiting for the executor service to 
shutdown");
         throw new RuntimeException(e);
       }
-      if (!isShutdown && attemptsLeft > 0) {
+      if (!isTerminated && attemptsLeft > 0) {
         LOG.warn(
             "Executor service is taking long time to shutdown, will retry. {} 
attempts left",
             attemptsLeft);
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
index 39abcf99d393..7c43a7876833 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -148,7 +148,7 @@ public void shouldInterruptKinesisReadingAndStopShortly()
         .thenAnswer(
             (Answer<List<KinesisRecord>>)
                 invocation -> {
-                  Thread.sleep(TimeUnit.MINUTES.toMillis(1));
+                  Thread.sleep(TIMEOUT_IN_MILLIS / 2);
                   return Collections.emptyList();
                 });
     shardReadersPool.start();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177490)
    Time Spent: 1h 20m  (was: 1h 10m)

> KinesisIO - ShardReadersPool - Unexpected exception occurred
> ------------------------------------------------------------
>
>                 Key: BEAM-6262
>                 URL: https://issues.apache.org/jira/browse/BEAM-6262
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.7.0, 2.8.0, 2.9.0
>            Reporter: Alexey Romanenko
>            Assignee: Alexey Romanenko
>            Priority: Minor
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> While reading a stream with limited number of records or time it throws an 
> exception in the end. Though, it seems that there is no data loss, so it's a 
> minor issue.
> {noformat}
> INFO: Creating new reader using [Checkpoint AT_TIMESTAMP for stream 
> apache-beam, shard shardId-000000000002: null, Checkpoint AT_TIMESTAMP for 
> stream apache-beam, shard shardId-000000000001: null]
> Dec 18, 2018 6:41:44 PM org.apache.beam.sdk.io.kinesis.KinesisReader start
> INFO: Starting reader using [Checkpoint AT_TIMESTAMP for stream apache-beam, 
> shard shardId-000000000002: null, Checkpoint AT_TIMESTAMP for stream 
> apache-beam, shard shardId-000000000001: null]
> Dec 18, 2018 6:41:46 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool stop
> INFO: Closing shard iterators pool
> Dec 18, 2018 6:41:46 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool 
> readLoop
> SEVERE: Unexpected exception occurred
> java.lang.RuntimeException: Not retryable client failure
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:225)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
>       at 
> org.apache.beam.sdk.io.kinesis.ShardRecordsIterator.fetchRecords(ShardRecordsIterator.java:86)
>       at 
> org.apache.beam.sdk.io.kinesis.ShardRecordsIterator.readNextBatch(ShardRecordsIterator.java:76)
>       at 
> org.apache.beam.sdk.io.kinesis.ShardReadersPool.readLoop(ShardReadersPool.java:123)
>       at 
> org.apache.beam.sdk.io.kinesis.ShardReadersPool.lambda$startReadingShards$0(ShardReadersPool.java:114)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: com.amazonaws.AbortedException: 
>       at 
> com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
>       at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
>       at 
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
>       at 
> com.fasterxml.jackson.dataformat.cbor.CBORParserBootstrapper.ensureLoaded(CBORParserBootstrapper.java:196)
>       at 
> com.fasterxml.jackson.dataformat.cbor.CBORParserBootstrapper.constructParser(CBORParserBootstrapper.java:90)
>       at 
> com.fasterxml.jackson.dataformat.cbor.CBORFactory._createParser(CBORFactory.java:350)
>       at 
> com.fasterxml.jackson.dataformat.cbor.CBORFactory.createParser(CBORFactory.java:287)
>       at 
> com.fasterxml.jackson.dataformat.cbor.CBORFactory.createParser(CBORFactory.java:26)
>       at 
> com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
>       at 
> com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
>       at 
> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>       at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)
>       at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)
>       at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1062)
>       at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1038)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
>       ... 11 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to