This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f89977  [state] make setting and opening state table more robust 
(#3029)
9f89977 is described below

commit 9f8997761da7ca4ce2e3b2abde2998069de2c35f
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Nov 25 11:42:00 2018 -0800

    [state] make setting and opening state table more robust (#3029)
---
 .../functions/instance/JavaInstanceRunnable.java   | 37 ++++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fd3cf95..72dbb9f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,10 +19,12 @@
 
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -32,8 +34,15 @@ import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.common.util.Retries;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -314,7 +323,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 .setMinNumRanges(4)
                 .setStorageType(StorageType.TABLE)
                 .build();
-            while (true) {
+            Stopwatch elapsedWatch = Stopwatch.createStarted();
+            while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
                 try {
                     result(storageAdminClient.getStream(tableNs, tableName));
                     return;
@@ -335,6 +345,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                         // there might be two client conflicting at creating 
table, so let's retrieve it to make
                         // sure the table is created.
                     }
+                } catch (ClientException ce) {
+                    log.warn("Encountered issue on fetching state stable 
metadata, re-attempting in 100 milliseconds",
+                        ce.getMessage());
+                    TimeUnit.MILLISECONDS.sleep(100);
                 }
             }
         }
@@ -354,6 +368,13 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         StorageClientSettings settings = StorageClientSettings.newBuilder()
                 .serviceUri(stateStorageServiceUrl)
                 .clientName("function-" + tableNs + "/" + tableName)
+                // configure a maximum 2 minutes jitter backoff for accessing 
table service
+                .backoffPolicy(Jitter.of(
+                    Type.EXPONENTIAL,
+                    100,
+                    2000,
+                    60
+                ))
                 .build();
 
         // we defer creation of the state table until a java instance is 
running here.
@@ -364,7 +385,19 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 .withSettings(settings)
                 .withNamespace(tableNs)
                 .build();
-        this.stateTable = result(storageClient.openTable(tableName));
+        // NOTE: this is a workaround until we bump bk version to 4.9.0
+        // table might just be created above, so it might not be ready for 
serving traffic
+        Stopwatch openSw = Stopwatch.createStarted();
+        while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+            try {
+                this.stateTable = result(storageClient.openTable(tableName));
+                break;
+            } catch (InternalServerException ise) {
+                log.warn("Encountered internal server on opening table '{}', 
re-attempt in 100 milliseconds : {}",
+                    tableName, ise.getMessage());
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        }
     }
 
     private void processResult(Record srcRecord,

Reply via email to