This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f89977 [state] make setting and opening state table more robust
(#3029)
9f89977 is described below
commit 9f8997761da7ca4ce2e3b2abde2998069de2c35f
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Nov 25 11:42:00 2018 -0800
[state] make setting and opening state table more robust (#3029)
---
.../functions/instance/JavaInstanceRunnable.java | 37 ++++++++++++++++++++--
1 file changed, 35 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fd3cf95..72dbb9f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,10 +19,12 @@
package org.apache.pulsar.functions.instance;
+import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.TimeUnit;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -32,8 +34,15 @@ import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.common.util.Retries;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -314,7 +323,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
.setMinNumRanges(4)
.setStorageType(StorageType.TABLE)
.build();
- while (true) {
+ Stopwatch elapsedWatch = Stopwatch.createStarted();
+ while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
try {
result(storageAdminClient.getStream(tableNs, tableName));
return;
@@ -335,6 +345,10 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
// there might be two client conflicting at creating
table, so let's retrieve it to make
// sure the table is created.
}
+ } catch (ClientException ce) {
+ log.warn("Encountered issue on fetching state stable
metadata, re-attempting in 100 milliseconds",
+ ce.getMessage());
+ TimeUnit.MILLISECONDS.sleep(100);
}
}
}
@@ -354,6 +368,13 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
StorageClientSettings settings = StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("function-" + tableNs + "/" + tableName)
+ // configure a maximum 2 minutes jitter backoff for accessing
table service
+ .backoffPolicy(Jitter.of(
+ Type.EXPONENTIAL,
+ 100,
+ 2000,
+ 60
+ ))
.build();
// we defer creation of the state table until a java instance is
running here.
@@ -364,7 +385,19 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
.withSettings(settings)
.withNamespace(tableNs)
.build();
- this.stateTable = result(storageClient.openTable(tableName));
+ // NOTE: this is a workaround until we bump bk version to 4.9.0
+ // table might just be created above, so it might not be ready for
serving traffic
+ Stopwatch openSw = Stopwatch.createStarted();
+ while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+ try {
+ this.stateTable = result(storageClient.openTable(tableName));
+ break;
+ } catch (InternalServerException ise) {
+ log.warn("Encountered internal server on opening table '{}',
re-attempt in 100 milliseconds : {}",
+ tableName, ise.getMessage());
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
}
private void processResult(Record srcRecord,