This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e82ff7e  adding max retries for assignment write wait (#1348)
e82ff7e is described below

commit e82ff7e3d5afc736e1ec4e39e165b062255b025a
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Tue Mar 6 16:01:28 2018 -0800

    adding max retries for assignment write wait (#1348)
    
    * adding max retries for assignment write wait.  Also fix issue with 
SchedulerAssignmentTest
    
    * cleaning up tests
    
    * cleaning up formatting
---
 conf/functions_worker.yml                          |  1 +
 .../pulsar/functions/worker/SchedulerManager.java  |  6 +++
 .../pulsar/functions/worker/WorkerConfig.java      |  1 +
 .../functions/worker/SchedulerManagerTest.java     | 44 ++++++++--------------
 4 files changed, 23 insertions(+), 29 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c2c4091..daaeae9 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,3 +44,4 @@ functionAssignmentTopicName: "assignments"
 failureCheckFreqMs: 30000
 rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
+assignmentWriteMaxRetries: 60
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index de0228d..fd61161 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -164,13 +164,19 @@ public class SchedulerManager implements AutoCloseable {
         }
 
         // wait for assignment update to go throw the pipeline
+        int retries = 0;
         while (this.functionRuntimeManager.getCurrentAssignmentVersion() < 
assignmentVersion) {
+            if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
+                log.warn("Max number of retries reached for waiting for 
assignment to propagate. Will continue now.");
+                break;
+            }
             log.info("Waiting for assignments to propagate...");
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
+            retries++;
         }
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 84f2438..579572a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -57,6 +57,7 @@ public class WorkerConfig implements Serializable {
     private long failureCheckFreqMs;
     private long rescheduleTimeoutMs;
     private int initialBrokerReconnectMaxRetries;
+    private int assignmentWriteMaxRetries;
 
     @Data
     @Setter
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index f9a03db..54a245f 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -41,9 +41,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -74,6 +73,7 @@ public class SchedulerManagerTest {
         workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
                 
.setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         
workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
+        workerConfig.setAssignmentWriteMaxRetries(0);
 
         producer = mock(Producer.class);
         completableFuture = spy(new CompletableFuture<>());
@@ -94,7 +94,8 @@ public class SchedulerManagerTest {
     }
 
     @Test
-    public void testSchedule() throws PulsarClientException, 
NoSuchMethodException, InterruptedException {
+    public void testSchedule() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
+            TimeoutException, ExecutionException {
 
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
@@ -139,7 +140,7 @@ public class SchedulerManagerTest {
 
     @Test
     public void testNothingNewToSchedule() throws InterruptedException, 
ExecutionException, NoSuchMethodException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException {
 
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
@@ -191,7 +192,7 @@ public class SchedulerManagerTest {
 
     @Test
     public void testAddingFunctions() throws NoSuchMethodException, 
InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -255,7 +256,7 @@ public class SchedulerManagerTest {
 
     @Test
     public void testDeletingFunctions() throws NoSuchMethodException, 
InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -321,7 +322,8 @@ public class SchedulerManagerTest {
     }
 
     @Test
-    public void testScalingUp() throws NoSuchMethodException, 
InterruptedException, InvalidProtocolBufferException, PulsarClientException {
+    public void testScalingUp() throws NoSuchMethodException, 
InterruptedException, InvalidProtocolBufferException,
+            PulsarClientException, TimeoutException, ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -431,7 +433,7 @@ public class SchedulerManagerTest {
 
     @Test
     public void testScalingDown() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -542,7 +544,7 @@ public class SchedulerManagerTest {
 
     @Test
     public void testUpdate() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -666,29 +668,13 @@ public class SchedulerManagerTest {
         );
     }
 
-    private void callSchedule() throws NoSuchMethodException, 
InterruptedException {
+    private void callSchedule() throws NoSuchMethodException, 
InterruptedException,
+            TimeoutException, ExecutionException {
         long intialVersion = 
functionRuntimeManager.getCurrentAssignmentVersion();
-        int initalCount = getMethodInvocationDetails(completableFuture,
-                CompletableFuture.class.getMethod("get")).size();
-        log.info("initalCount: {}", initalCount);
         Future<?> complete = schedulerManager.schedule();
-        int count = 0;
-        while (!complete.isDone()) {
 
-            int invocationCount = getMethodInvocationDetails(completableFuture,
-                    CompletableFuture.class.getMethod("get")).size();
-            log.info("invocationCount: {}", invocationCount);
-
-            if (invocationCount >= initalCount + 1) {
-                doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
-            }
-
-            if (count > 100) {
-                Assert.fail("Scheduler failed to terminate!");
-            }
-            Thread.sleep(100);
-            count++;
-        }
+        complete.get(30, TimeUnit.SECONDS);
+        doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
     }
 
     private List<Invocation> getMethodInvocationDetails(Object o, Method 
method) throws NoSuchMethodException {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to