This is an automated email from the ASF dual-hosted git repository.
kichan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new a2e8615bd8 Add support for async requests for wasm plugin (#9896)
a2e8615bd8 is described below
commit a2e8615bd816731db7731204265228e6176a4b87
Author: Kit Chan <[email protected]>
AuthorDate: Mon Jul 10 16:28:18 2023 -0700
Add support for async requests for wasm plugin (#9896)
* Add support for async requests for wasm plugin
* Add wasm plugin async example
* fix typo
* fix typo
* fix format issues
* fix format issues
* update documentation
* Documentation change and add function to mark transaction already
reenabled
* fix typo
* simplify the case statements for getting header map
---
doc/admin-guide/plugins/wasm.en.rst | 4 +-
plugins/experimental/wasm/ats_context.cc | 191 ++++++++++++++++++++-
plugins/experimental/wasm/ats_context.h | 71 ++++++++
plugins/experimental/wasm/examples/async/Makefile | 22 +++
plugins/experimental/wasm/examples/async/README.md | 13 ++
plugins/experimental/wasm/examples/async/async.cc | 85 +++++++++
.../experimental/wasm/examples/async/async.yaml | 24 +++
plugins/experimental/wasm/wasm_main.cc | 40 ++++-
8 files changed, 435 insertions(+), 15 deletions(-)
diff --git a/doc/admin-guide/plugins/wasm.en.rst
b/doc/admin-guide/plugins/wasm.en.rst
index 9b83b8b8ee..642c49fb83 100644
--- a/doc/admin-guide/plugins/wasm.en.rst
+++ b/doc/admin-guide/plugins/wasm.en.rst
@@ -102,9 +102,8 @@ The plugin can also take more than one yaml file as
arguments and can thus load
TODO
====
-* Currently only the WAMR and WasmEdge runtime is supported. We should also
support V8 and Wasmtime.
+* Currently only the WAMR and WasmEdge runtime is supported. We should also
support V8 and Wasmtime later.
* Need to support functionality for retrieving and setting request/response
body
-* Need to support functionality for making async request call
* Need to support L4 lifecycle handler functions
Limitations
@@ -114,5 +113,6 @@ The plugin will not support the following functionality as
specified in Proxy-Wa
* Getting and setting trailer request and response header
* Getting and setting HTTP/2 frame meta data
+* Support asynchronous request call in the start handler function of the
plugin lifecycle
* Support on Grpc lifecycle handler functions
diff --git a/plugins/experimental/wasm/ats_context.cc
b/plugins/experimental/wasm/ats_context.cc
index e69099cb33..b3de641091 100644
--- a/plugins/experimental/wasm/ats_context.cc
+++ b/plugins/experimental/wasm/ats_context.cc
@@ -33,6 +33,83 @@
namespace ats_wasm
{
+
+static int
+async_handler(TSCont cont, TSEvent event, void *edata)
+{
+ // information for the handler
+ TSHttpTxn txn = static_cast<TSHttpTxn>(edata);
+ AsyncInfo *ai = (AsyncInfo *)TSContDataGet(cont);
+ uint32_t token = ai->token;
+ Context *root_context = ai->root_context;
+ Wasm *wasm = root_context->wasm();
+
+ // variables to be used in handler
+ TSEvent result = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+ const void *body = nullptr;
+ size_t body_size = 0;
+ TSMBuffer hdr_buf = nullptr;
+ TSMLoc hdr_loc = nullptr;
+ int header_size = 0;
+
+ TSMutexLock(wasm->mutex());
+ // filling in variables for a successful fetch
+ if (event == static_cast<TSEvent>(FETCH_EVENT_ID_BASE)) {
+ int data_len;
+ const char *data_start = TSFetchRespGet(txn, &data_len);
+ if (data_start && (data_len > 0)) {
+ const char *data_end = data_start + data_len;
+ TSHttpParser parser = TSHttpParserCreate();
+ hdr_buf = TSMBufferCreate();
+ hdr_loc = TSHttpHdrCreate(hdr_buf);
+ TSHttpHdrTypeSet(hdr_buf, hdr_loc, TS_HTTP_TYPE_RESPONSE);
+ if (TSHttpHdrParseResp(parser, hdr_buf, hdr_loc, &data_start, data_end)
== TS_PARSE_DONE) {
+ TSHttpStatus status = TSHttpHdrStatusGet(hdr_buf, hdr_loc);
+ header_size = TSMimeHdrFieldsCount(hdr_buf, hdr_loc);
+ body = data_start; // data_start will now be pointing
to body
+ body_size = data_end - data_start;
+ TSDebug(WASM_DEBUG_TAG, "[%s] Fetch result had a status code of %d
with a body length of %ld", __FUNCTION__, status,
+ body_size);
+ } else {
+ TSError("[wasm][%s] Unable to parse call response", __FUNCTION__);
+ event = static_cast<TSEvent>(FETCH_EVENT_ID_BASE + 1);
+ }
+ TSHttpParserDestroy(parser);
+ } else {
+ TSError("[wasm][%s] Successful fetch did not result in any content.
Assuming failure", __FUNCTION__);
+ event = static_cast<TSEvent>(FETCH_EVENT_ID_BASE + 1);
+ }
+ result = event;
+ }
+
+ // callback function
+ TSDebug(WASM_DEBUG_TAG, "[%s] setting root context call result",
__FUNCTION__);
+ root_context->setHttpCallResult(hdr_buf, hdr_loc, body, body_size, result);
+ TSDebug(WASM_DEBUG_TAG, "[%s] trigger root context function, token: %d",
__FUNCTION__, token);
+ root_context->onHttpCallResponse(token, header_size, body_size, 0);
+ TSDebug(WASM_DEBUG_TAG, "[%s] resetting root context call result",
__FUNCTION__);
+ root_context->resetHttpCallResult();
+
+ // cleaning up
+ if (hdr_loc) {
+ TSMLoc null_parent_loc = nullptr;
+ TSHandleMLocRelease(hdr_buf, null_parent_loc, hdr_loc);
+ }
+ if (hdr_buf) {
+ TSMBufferDestroy(hdr_buf);
+ }
+
+ TSMutexUnlock(wasm->mutex());
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] delete async info and continuation",
__FUNCTION__);
+ // delete the Async Info
+ delete ai;
+ // delete continuation
+ TSContDestroy(cont);
+
+ return 0;
+}
+
// utiltiy function for properties
static void
print_address(struct sockaddr const *ip, std::string *result)
@@ -221,11 +298,10 @@ Context::Context(Wasm *wasm) : ContextBase(wasm) {}
Context::Context(Wasm *wasm, const std::shared_ptr<PluginBase> &plugin) :
ContextBase(wasm, plugin) {}
// NB: wasm can be nullptr if it failed to be created successfully.
-Context::Context(Wasm *wasm, uint32_t parent_context_id, const
std::shared_ptr<PluginBase> &plugin) : ContextBase(wasm)
+Context::Context(Wasm *wasm, uint32_t parent_context_id, const
std::shared_ptr<PluginBase> &plugin) : ContextBase(wasm, plugin)
{
- id_ = (wasm_ != nullptr) ? wasm_->allocContextId() : 0;
+ // setting up parent context
parent_context_id_ = parent_context_id;
- plugin_ = plugin;
if (wasm_ != nullptr) {
parent_context_ = wasm_->getContext(parent_context_id_);
}
@@ -430,12 +506,16 @@ Context::getBuffer(WasmBufferType type)
return buffer_.set(wasm_->vm_configuration());
case WasmBufferType::PluginConfiguration:
return buffer_.set(plugin_->plugin_configuration_);
+ case WasmBufferType::HttpCallResponseBody:
+ if (cr_body_ != nullptr) {
+ return buffer_.set(std::string(static_cast<const char *>(cr_body_),
cr_body_size_));
+ }
+ return buffer_.set("");
case WasmBufferType::CallData:
case WasmBufferType::HttpRequestBody:
case WasmBufferType::HttpResponseBody:
case WasmBufferType::NetworkDownstreamData:
case WasmBufferType::NetworkUpstreamData:
- case WasmBufferType::HttpCallResponseBody:
case WasmBufferType::GrpcReceiveBuffer:
default:
unimplemented();
@@ -443,6 +523,63 @@ Context::getBuffer(WasmBufferType type)
}
}
+WasmResult
+Context::httpCall(std::string_view target, const Pairs &request_headers,
std::string_view request_body,
+ const Pairs &request_trailers, int timeout_millisconds,
uint32_t *token_ptr)
+{
+ Wasm *wasm = this->wasm();
+ Context *root_context = this->root_context();
+
+ TSCont contp;
+ std::string request, method, path, authority;
+
+ // setup local address for API call
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = LOCAL_IP_ADDRESS;
+ addr.sin_port = LOCAL_PORT;
+
+ for (const auto &p : request_headers) {
+ std::string key(p.first);
+ std::string value(p.second);
+
+ if (key == ":method") {
+ method = value;
+ } else if (key == ":path") {
+ path = value;
+ } else if (key == ":authority") {
+ authority = value;
+ }
+ }
+
+ /* request */
+ request = method + " https://" + authority + path + " HTTP/1.1\r\n";
+ for (const auto &p : request_headers) {
+ std::string key(p.first);
+ std::string value(p.second);
+ request += key + ": " + value + "\r\n";
+ }
+ request += "\r\n";
+ request += request_body;
+
+ TSFetchEvent event_ids;
+ event_ids.success_event_id = FETCH_EVENT_ID_BASE;
+ event_ids.failure_event_id = FETCH_EVENT_ID_BASE + 1;
+ event_ids.timeout_event_id = FETCH_EVENT_ID_BASE + 2;
+
+ contp = TSContCreate(async_handler, TSMutexCreate());
+ AsyncInfo *ai = new AsyncInfo();
+ ai->token = wasm->nextHttpCallId();
+ ai->root_context = root_context;
+ *token_ptr = ai->token; // to be returned to the caller
+ TSContDataSet(contp, ai);
+
+ // API call for async fetch
+ TSFetchUrl(request.c_str(), request.size(), reinterpret_cast<struct sockaddr
const *>(&addr), contp, AFTER_BODY, event_ids);
+
+ return WasmResult::Ok;
+}
+
// Metrics
WasmResult
Context::defineMetric(uint32_t metric_type, std::string_view name, uint32_t
*metric_id_ptr)
@@ -1295,6 +1432,44 @@ Context::setProperty(std::string_view key,
std::string_view serialized_value)
return WasmResult::Ok;
}
+WasmResult
+Context::continueStream(WasmStreamType /* stream_type */)
+{
+ if (reenable_txn_) {
+ TSError("[wasm][%s] transaction already reenabled", __FUNCTION__);
+ return WasmResult::Ok;
+ }
+
+ if (txnp_ == nullptr) {
+ TSError("[wasm][%s] Can't continue stream without a transaction",
__FUNCTION__);
+ return WasmResult::InternalFailure;
+ } else {
+ TSDebug(WASM_DEBUG_TAG, "[%s] continuing txn for context %d",
__FUNCTION__, id());
+ reenable_txn_ = true;
+ TSHttpTxnReenable(txnp_, TS_EVENT_HTTP_CONTINUE);
+ return WasmResult::Ok;
+ }
+}
+
+WasmResult
+Context::closeStream(WasmStreamType /* stream_type */)
+{
+ if (reenable_txn_) {
+ TSError("[wasm][%s] transaction already reenabled", __FUNCTION__);
+ return WasmResult::Ok;
+ }
+
+ if (txnp_ == nullptr) {
+ TSError("[wasm][%s] Can't continue stream without a transaction",
__FUNCTION__);
+ return WasmResult::InternalFailure;
+ } else {
+ TSDebug(WASM_DEBUG_TAG, "[%s] continue txn for context %d with error",
__FUNCTION__, id());
+ reenable_txn_ = true;
+ TSHttpTxnReenable(txnp_, TS_EVENT_HTTP_ERROR);
+ return WasmResult::Ok;
+ }
+}
+
// send pre-made response
WasmResult
Context::sendLocalResponse(uint32_t response_code, std::string_view body_text,
Pairs additional_headers,
@@ -1533,10 +1708,16 @@ Context::getHeaderMap(WasmHeaderMapType type)
return {};
case WasmHeaderMapType::ResponseTrailers:
return {};
+ case WasmHeaderMapType::HttpCallResponseHeaders:
+ if (cr_hdr_buf_ == nullptr || cr_hdr_loc_ == nullptr) {
+ return {};
+ }
+ map.bufp = cr_hdr_buf_;
+ map.hdr_loc = cr_hdr_loc_;
+ return map;
default:
case WasmHeaderMapType::GrpcReceiveTrailingMetadata:
case WasmHeaderMapType::GrpcReceiveInitialMetadata:
- case WasmHeaderMapType::HttpCallResponseHeaders:
case WasmHeaderMapType::HttpCallResponseTrailers:
return {};
}
diff --git a/plugins/experimental/wasm/ats_context.h
b/plugins/experimental/wasm/ats_context.h
index b516c79679..fdf18ddbe9 100644
--- a/plugins/experimental/wasm/ats_context.h
+++ b/plugins/experimental/wasm/ats_context.h
@@ -41,9 +41,14 @@
namespace ats_wasm
{
+const unsigned int LOCAL_IP_ADDRESS = 0x0100007f;
+const int LOCAL_PORT = 8080;
+const int FETCH_EVENT_ID_BASE = 10000;
+
using proxy_wasm::ContextBase;
using proxy_wasm::PluginBase;
using proxy_wasm::WasmResult;
+using proxy_wasm::WasmStreamType;
using proxy_wasm::BufferInterface;
using proxy_wasm::BufferBase;
using proxy_wasm::WasmHeaderMapType;
@@ -173,6 +178,30 @@ public:
BufferInterface *getBuffer(WasmBufferType type) override;
+ WasmResult httpCall(std::string_view target, const Pairs &request_headers,
std::string_view request_body,
+ const Pairs &request_trailers, int timeout_millisconds,
uint32_t *token_ptr) override;
+
+ // Call result functions
+ void
+ setHttpCallResult(TSMBuffer buf, TSMLoc loc, const void *body, size_t size,
TSEvent result)
+ {
+ cr_hdr_buf_ = buf;
+ cr_hdr_loc_ = loc;
+ cr_body_ = body;
+ cr_body_size_ = size;
+ cr_result_ = result;
+ }
+
+ void
+ resetHttpCallResult()
+ {
+ cr_hdr_buf_ = nullptr;
+ cr_hdr_loc_ = nullptr;
+ cr_body_ = nullptr;
+ cr_body_size_ = 0;
+ cr_result_ = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+ }
+
// Metrics
WasmResult defineMetric(uint32_t metric_type, std::string_view name,
uint32_t *metric_id_ptr) override;
WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
@@ -185,9 +214,33 @@ public:
WasmResult setProperty(std::string_view key, std::string_view
serialized_value) override;
// send a premade response
+ WasmResult continueStream(WasmStreamType stream_type) override;
+ WasmResult closeStream(WasmStreamType stream_type) override;
WasmResult sendLocalResponse(uint32_t response_code, std::string_view
body_text, Pairs additional_headers,
GrpcStatusCode /* grpc_status */,
std::string_view details) override;
+ // check stream
+ bool
+ isTxnReenable()
+ {
+ return reenable_txn_;
+ }
+ void
+ setTxnReenable()
+ {
+ reenable_txn_ = true;
+ }
+ void
+ resetTxnReenable()
+ {
+ reenable_txn_ = false;
+ }
+ bool
+ isLocalReply()
+ {
+ return local_reply_;
+ }
+
WasmResult getSharedData(std::string_view key, std::pair<std::string,
uint32_t /* cas */> *data) override;
// Header/Trailer/Metadata Maps
@@ -208,11 +261,29 @@ private:
TSHttpTxn txnp_{nullptr};
TSCont scheduler_cont_{nullptr};
+ // continue/close stream?
+ bool reenable_txn_ = false;
+
+ // local reply
Pairs local_reply_headers_{};
std::string local_reply_details_ = "";
bool local_reply_ = false;
+ // buffer for result (don't set to null as default)
BufferBase buffer_;
+
+ // Call result
+ TSEvent cr_result_ = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+ const void *cr_body_ = nullptr;
+ size_t cr_body_size_ = 0;
+ TSMBuffer cr_hdr_buf_ = nullptr;
+ TSMLoc cr_hdr_loc_ = nullptr;
+};
+
+// local struct representing info for async transaction
+struct AsyncInfo {
+ uint32_t token;
+ Context *root_context;
};
} // namespace ats_wasm
diff --git a/plugins/experimental/wasm/examples/async/Makefile
b/plugins/experimental/wasm/examples/async/Makefile
new file mode 100644
index 0000000000..a9a553e5fd
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/Makefile
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+PROXY_WASM_CPP_SDK=/sdk
+
+all: async.wasm
+
+include ${PROXY_WASM_CPP_SDK}/Makefile.base_lite
diff --git a/plugins/experimental/wasm/examples/async/README.md
b/plugins/experimental/wasm/examples/async/README.md
new file mode 100644
index 0000000000..7dbca88263
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/README.md
@@ -0,0 +1,13 @@
+CPP example for making asynchronous requests for ATS Wasm Plugin
+
+To compile `async.wasm`
+* Create a docker image following instructions here -
https://github.com/proxy-wasm/proxy-wasm-cpp-sdk/blob/master/README.md#docker
+* In this directory run `docker run -v $PWD:/work -w /work wasmsdk:v2
/build_wasm.sh`
+
+Copy the yaml in this directory and the generated `async.wasm` to
`/usr/local/var/wasm/` and activate the plugin in `plugin.config`
+* `wasm.so /usr/local/var/wasm/async.yaml`
+
+Make sure you have mapping rules in remap.config like below
+* `map https://www.google.com/ https://www.google.com/
+
+You can trigger the asynchronous requests with a request with User-Agent set
to `test`
diff --git a/plugins/experimental/wasm/examples/async/async.cc
b/plugins/experimental/wasm/examples/async/async.cc
new file mode 100644
index 0000000000..a5e3712929
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/async.cc
@@ -0,0 +1,85 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include <string>
+#include <unordered_map>
+
+#include "proxy_wasm_intrinsics.h"
+
+class ExampleRootContext : public RootContext
+{
+public:
+ explicit ExampleRootContext(uint32_t id, std::string_view root_id) :
RootContext(id, root_id) {}
+
+ bool onStart(size_t) override;
+};
+
+class ExampleContext : public Context
+{
+public:
+ explicit ExampleContext(uint32_t id, RootContext *root) : Context(id, root)
{}
+
+ FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream)
override;
+};
+static RegisterContextFactory
register_ExampleContext(CONTEXT_FACTORY(ExampleContext),
ROOT_FACTORY(ExampleRootContext),
+ "myproject");
+
+bool
+ExampleRootContext::onStart(size_t)
+{
+ logInfo(std::string("onStart"));
+ return true;
+}
+
+FilterHeadersStatus
+ExampleContext::onRequestHeaders(uint32_t headers, bool end_of_stream)
+{
+ // print UA
+ auto ua = getRequestHeader("User-Agent");
+ logInfo(std::string("UA ") + std::string(ua->view()));
+
+ auto context_id = id();
+ auto callback = [context_id](uint32_t, size_t body_size, uint32_t) {
+ logInfo("async call done");
+ if (body_size == 0) {
+ logInfo("async_call failed");
+ return;
+ }
+ auto response_headers =
getHeaderMapPairs(WasmHeaderMapType::HttpCallResponseHeaders);
+ auto body =
getBufferBytes(WasmBufferType::HttpCallResponseBody, 0, body_size);
+ for (auto &p : response_headers->pairs()) {
+ logInfo(std::string(p.first) + std::string(" -> ") +
std::string(p.second));
+ }
+ logInfo(std::string(body->view()));
+
+ getContext(context_id)->setEffectiveContext();
+ logInfo("continueRequest");
+ continueRequest();
+ };
+ std::string test = std::string(ua->view());
+ if (test == "test") {
+ root()->httpCall("cluster",
+ {
+ {":method", "GET" },
+ {":path", "/.well-known/security.txt"},
+ {":authority", "www.google.com" }
+ },
+ "", {}, 10000, callback);
+ return FilterHeadersStatus::StopIteration;
+ }
+
+ return FilterHeadersStatus::Continue;
+}
diff --git a/plugins/experimental/wasm/examples/async/async.yaml
b/plugins/experimental/wasm/examples/async/async.yaml
new file mode 100644
index 0000000000..e97eb3f398
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/async.yaml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+config:
+ name: test
+ rootId: myproject
+ vmConfig:
+ code:
+ local:
+ filename: /usr/local/var/wasm/async.wasm
+ runtime: ats.wasm.runtime.wamr
+ vmId: test
diff --git a/plugins/experimental/wasm/wasm_main.cc
b/plugins/experimental/wasm/wasm_main.cc
index 5267914db5..4a52268ab0 100644
--- a/plugins/experimental/wasm/wasm_main.cc
+++ b/plugins/experimental/wasm/wasm_main.cc
@@ -127,6 +127,9 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
int result = -1;
auto *context = static_cast<ats_wasm::Context *>(TSContDataGet(contp));
auto *old_wasm = static_cast<ats_wasm::Wasm *>(context->wasm());
+
+ context->resetTxnReenable();
+
TSMutexLock(old_wasm->mutex());
std::shared_ptr<ats_wasm::Wasm> temp = nullptr;
auto *txnp = static_cast<TSHttpTxn>(data);
@@ -144,6 +147,7 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
TSError("[wasm][%s] cannot retrieve client request", __FUNCTION__);
TSMutexUnlock(old_wasm->mutex());
TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+ context->setTxnReenable();
return 0;
}
count = TSMimeHdrFieldsCount(buf, hdr_loc);
@@ -166,6 +170,7 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
TSError("[wasm][%s] cannot retrieve server response", __FUNCTION__);
TSMutexUnlock(old_wasm->mutex());
TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+ context->setTxnReenable();
return 0;
}
count = TSMimeHdrFieldsCount(buf, hdr_loc);
@@ -243,15 +248,34 @@ http_event_handler(TSCont contp, TSEvent event, void
*data)
TSMutexUnlock(old_wasm->mutex());
- if (result == 0) {
- TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
- } else if (result < 0) {
- TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ // check if we have reenable transaction already or not
+ if ((context == nullptr) || (!context->isTxnReenable())) {
+ TSDebug(WASM_DEBUG_TAG, "[%s] no context or not yet reenabled
transaction", __FUNCTION__);
+
+ if (result == 0) {
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+ if (context != nullptr) {
+ context->setTxnReenable();
+ }
+ } else if (result < 0) {
+ TSDebug(WASM_DEBUG_TAG, "[%s] abnormal event, continue with error",
__FUNCTION__);
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ if (context != nullptr) {
+ context->setTxnReenable();
+ }
+ } else {
+ if (context->isLocalReply()) {
+ TSDebug(WASM_DEBUG_TAG, "[%s] abnormal return, continue with error due
to local reply", __FUNCTION__);
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ if (context != nullptr) {
+ context->setTxnReenable();
+ }
+ } else {
+ TSDebug(WASM_DEBUG_TAG, "[%s] abnormal return, no continue, context
id: %d", __FUNCTION__, context->id());
+ }
+ }
} else {
- // TODO: wait for async operation
- // Temporarily resume with error
- TSDebug(WASM_DEBUG_TAG, "[%s] result > 0, continue with error for now",
__FUNCTION__);
- TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ TSDebug(WASM_DEBUG_TAG, "[%s] transaction already reenabled",
__FUNCTION__);
}
return 0;
}