This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bfdcb77 Adding timeout to open table call for function state (#9006)
bfdcb77 is described below
commit bfdcb77808c7245546dba47aabfb8579c3dfa3a7
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Dec 19 09:10:28 2020 -0800
Adding timeout to open table call for function state (#9006)
Co-authored-by: Jerry Peng <[email protected]>
---
.../pulsar/functions/instance/state/BKStateStoreProviderImpl.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
index 88960d6..f531ea5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
@@ -151,11 +153,13 @@ public class BKStateStoreProviderImpl implements
StateStoreProvider {
Stopwatch openSw = Stopwatch.createStarted();
while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
try {
- return result(client.openTable(name));
+ return result(client.openTable(name), 1, TimeUnit.MINUTES);
} catch (InternalServerException ise) {
log.warn("Encountered internal server on opening state table
'{}/{}/{}', re-attempt in 100 milliseconds : {}",
tenant, namespace, name, ise.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Failed to open state table for
function " + tenant + "/" + namespace + "/" + name + " within timeout period",
e);
}
}
throw new IOException("Failed to open state table for function " +
tenant + "/" + namespace + "/" + name);