sijie closed pull request #3029: [state] make setting and opening state table 
more robust
URL: https://github.com/apache/pulsar/pull/3029
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 70ed588e7e..91363046bd 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,10 +19,12 @@
 
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -32,8 +34,15 @@
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.common.util.Retries;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -314,7 +323,8 @@ private void createStateTable(String tableNs, String 
tableName, StorageClientSet
                 .setMinNumRanges(4)
                 .setStorageType(StorageType.TABLE)
                 .build();
-            while (true) {
+            Stopwatch elapsedWatch = Stopwatch.createStarted();
+            while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
                 try {
                     result(storageAdminClient.getStream(tableNs, tableName));
                     return;
@@ -335,6 +345,10 @@ private void createStateTable(String tableNs, String 
tableName, StorageClientSet
                         // there might be two client conflicting at creating 
table, so let's retrieve it to make
                         // sure the table is created.
                     }
+                } catch (ClientException ce) {
+                    log.warn("Encountered issue on fetching state stable 
metadata, re-attempting in 100 milliseconds",
+                        ce.getMessage());
+                    TimeUnit.MILLISECONDS.sleep(100);
                 }
             }
         }
@@ -354,6 +368,13 @@ private void setupStateTable() throws Exception {
         StorageClientSettings settings = StorageClientSettings.newBuilder()
                 .serviceUri(stateStorageServiceUrl)
                 .clientName("function-" + tableNs + "/" + tableName)
+                // configure a maximum 2 minutes jitter backoff for accessing 
table service
+                .backoffPolicy(Jitter.of(
+                    Type.EXPONENTIAL,
+                    100,
+                    2000,
+                    60
+                ))
                 .build();
 
         // we defer creation of the state table until a java instance is 
running here.
@@ -364,7 +385,19 @@ private void setupStateTable() throws Exception {
                 .withSettings(settings)
                 .withNamespace(tableNs)
                 .build();
-        this.stateTable = result(storageClient.openTable(tableName));
+        // NOTE: this is a workaround until we bump bk version to 4.9.0
+        // table might just be created above, so it might not be ready for 
serving traffic
+        Stopwatch openSw = Stopwatch.createStarted();
+        while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+            try {
+                this.stateTable = result(storageClient.openTable(tableName));
+                break;
+            } catch (InternalServerException ise) {
+                log.warn("Encountered internal server on opening table '{}', 
re-attempt in 100 milliseconds : {}",
+                    tableName, ise.getMessage());
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        }
     }
 
     private void processResult(Record srcRecord,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to