[ 
https://issues.apache.org/jira/browse/BEAM-690?focusedWorklogId=143512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143512
 ]

ASF GitHub Bot logged work on BEAM-690:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Sep/18 13:42
            Start Date: 12/Sep/18 13:42
    Worklog Time Spent: 10m 
      Work Description: janotav closed pull request #6303: [BEAM-690] Backoff 
in the DirectRunner if no work is available
URL: https://github.com/apache/beam/pull/6303
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 43aa9e7a364..d10906eccc0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -164,7 +164,7 @@ public void start(DirectGraph graph, RootProviderRegistry 
rootProviderRegistry)
         QuiescenceDriver.create(
             evaluationContext, graph, this, visibleUpdates, 
pendingRootBundles.build());
     executorService.submit(
-        new Runnable() {
+        new Throttleable() {
           @Override
           public void run() {
             DriverState drive = executionDriver.drive();
@@ -178,20 +178,49 @@ public void run() {
                   newPipelineState = State.DONE;
                   break;
                 case CONTINUE:
+                case CONTINUE_THROTTLE:
                   throw new IllegalStateException(
-                      String.format("%s should not be a terminal state", 
DriverState.CONTINUE));
+                      String.format("%s should not be a terminal state", 
drive));
                 default:
                   throw new IllegalArgumentException(
                       String.format("Unknown %s %s", 
DriverState.class.getSimpleName(), drive));
               }
               shutdownIfNecessary(newPipelineState);
             } else {
+              throttle(drive);
               executorService.submit(this);
             }
           }
         });
   }
 
+  abstract static class Throttleable implements Runnable {
+    static final long DELAY_FIRST = 10;
+    static final long DELAY_MAX = 100;
+    long delay;
+
+    Throttleable() {
+      delay = DELAY_FIRST;
+    }
+
+    void sleep() throws InterruptedException {
+      Thread.sleep(delay);
+    }
+
+    void throttle(DriverState state) {
+      if (DriverState.CONTINUE_THROTTLE.equals(state)) {
+        try {
+          sleep();
+          delay = Math.min(delay * 2, DELAY_MAX);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        delay = DELAY_FIRST;
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void process(
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index 4547a69933c..40bc5d5aa29 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -94,7 +94,7 @@ private QuiescenceDriver(
 
   @Override
   public DriverState drive() {
-    boolean noWorkOutstanding = outstandingWork.get() == 0L;
+    boolean noWorkOutstanding = noWorkOutstanding();
     ExecutorState startingState = state.get();
     if (startingState == ExecutorState.ACTIVE) {
       // The remainder of this call will add all available work to the 
Executor, and there will
@@ -126,10 +126,18 @@ public DriverState drive() {
     } else if (evaluationContext.isDone()) {
       return DriverState.SHUTDOWN;
     } else {
-      return DriverState.CONTINUE;
+      if (noWorkOutstanding()) {
+        return DriverState.CONTINUE_THROTTLE;
+      } else {
+        return DriverState.CONTINUE;
+      }
     }
   }
 
+  private boolean noWorkOutstanding() {
+    return outstandingWork.get() == 0L;
+  }
+
   private void applyUpdate(
       boolean noWorkOutstanding, ExecutorState startingState, WorkUpdate 
update) {
     LOG.debug("Executor Update: {}", update);
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThrottleableTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThrottleableTest.java
new file mode 100644
index 00000000000..e1fa99293ff
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThrottleableTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.local.ExecutionDriver;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test the exponential back-off algorithm as provided by {@link
+ * ExecutorServiceParallelExecutor.Throttleable}.
+ */
+public class ThrottleableTest {
+
+  @Test
+  public void testBackoff() {
+    List<ExecutionDriver.DriverState> states =
+        Lists.newArrayList(
+            ExecutionDriver.DriverState.CONTINUE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE,
+            ExecutionDriver.DriverState.CONTINUE_THROTTLE);
+    List<Long> sleeps = new ArrayList<>();
+    ExecutorServiceParallelExecutor.Throttleable throttleable =
+        new ExecutorServiceParallelExecutor.Throttleable() {
+          @Override
+          public void sleep() {
+            sleeps.add(delay);
+          }
+
+          @Override
+          public void run() {
+            throttle(states.remove(0));
+          }
+        };
+    while (!states.isEmpty()) {
+      throttleable.run();
+    }
+    Assert.assertEquals(
+        Lists.newArrayList(
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST * 2,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST * 2,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST * 4,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_FIRST * 8,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_MAX,
+            ExecutorServiceParallelExecutor.Throttleable.DELAY_MAX),
+        sleeps);
+  }
+}
diff --git 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
index 61cf2078ce8..fcbae08f356 100644
--- 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
+++ 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
@@ -25,6 +25,7 @@
   /** The state of the driver. If the state is terminal, the driver can no 
longer make progress. */
   enum DriverState {
     CONTINUE(false),
+    CONTINUE_THROTTLE(false),
     FAILED(true),
     SHUTDOWN(true);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 143512)
    Time Spent: 40m  (was: 0.5h)

> Backoff in the DirectRunner Monitor if no work is Available
> -----------------------------------------------------------
>
>                 Key: BEAM-690
>                 URL: https://issues.apache.org/jira/browse/BEAM-690
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Thomas Groh
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> When a Pipeline has no elements available to process, the Monitor Runnable 
> will be repeatedly scheduled. Given that there is no work to be done, this 
> will loop over the steps in the transform looking for timers, and prompt the 
> sources to perform additional work, even though there is no work to be done. 
> This consumes the entirety of a single core.
> Add a bounded backoff to rescheduling the monitor runnable if no work has 
> been done since it last ran. This will reduce resource consumption on 
> low-throughput Pipelines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to