This is an automated email from the ASF dual-hosted git repository.

kichan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new a2e8615bd8 Add support for async requests for wasm plugin (#9896)
a2e8615bd8 is described below

commit a2e8615bd816731db7731204265228e6176a4b87
Author: Kit Chan <[email protected]>
AuthorDate: Mon Jul 10 16:28:18 2023 -0700

    Add support for async requests for wasm plugin (#9896)
    
    * Add support for async requests for wasm plugin
    
    * Add wasm plugin async example
    
    * fix typo
    
    * fix typo
    
    * fix format issues
    
    * fix format issues
    
    * update documentation
    
    * Documentation change and add function to mark transaction already 
reenabled
    
    * fix typo
    
    * simplify the case statements for getting header map
---
 doc/admin-guide/plugins/wasm.en.rst                |   4 +-
 plugins/experimental/wasm/ats_context.cc           | 191 ++++++++++++++++++++-
 plugins/experimental/wasm/ats_context.h            |  71 ++++++++
 plugins/experimental/wasm/examples/async/Makefile  |  22 +++
 plugins/experimental/wasm/examples/async/README.md |  13 ++
 plugins/experimental/wasm/examples/async/async.cc  |  85 +++++++++
 .../experimental/wasm/examples/async/async.yaml    |  24 +++
 plugins/experimental/wasm/wasm_main.cc             |  40 ++++-
 8 files changed, 435 insertions(+), 15 deletions(-)

diff --git a/doc/admin-guide/plugins/wasm.en.rst 
b/doc/admin-guide/plugins/wasm.en.rst
index 9b83b8b8ee..642c49fb83 100644
--- a/doc/admin-guide/plugins/wasm.en.rst
+++ b/doc/admin-guide/plugins/wasm.en.rst
@@ -102,9 +102,8 @@ The plugin can also take more than one yaml file as 
arguments and can thus load
 TODO
 ====
 
-* Currently only the WAMR and WasmEdge runtime is supported. We should also 
support V8 and Wasmtime.
+* Currently only the WAMR and WasmEdge runtime is supported. We should also 
support V8 and Wasmtime later.
 * Need to support functionality for retrieving and setting request/response 
body
-* Need to support functionality for making async request call
 * Need to support L4 lifecycle handler functions
 
 Limitations
@@ -114,5 +113,6 @@ The plugin will not support the following functionality as 
specified in Proxy-Wa
 
 * Getting and setting trailer request and response header
 * Getting and setting HTTP/2 frame meta data
+* Support asynchronous request call in the start handler function of the 
plugin lifecycle
 * Support on Grpc lifecycle handler functions
 
diff --git a/plugins/experimental/wasm/ats_context.cc 
b/plugins/experimental/wasm/ats_context.cc
index e69099cb33..b3de641091 100644
--- a/plugins/experimental/wasm/ats_context.cc
+++ b/plugins/experimental/wasm/ats_context.cc
@@ -33,6 +33,83 @@
 
 namespace ats_wasm
 {
+
+static int
+async_handler(TSCont cont, TSEvent event, void *edata)
+{
+  // information for the handler
+  TSHttpTxn txn         = static_cast<TSHttpTxn>(edata);
+  AsyncInfo *ai         = (AsyncInfo *)TSContDataGet(cont);
+  uint32_t token        = ai->token;
+  Context *root_context = ai->root_context;
+  Wasm *wasm            = root_context->wasm();
+
+  // variables to be used in handler
+  TSEvent result    = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+  const void *body  = nullptr;
+  size_t body_size  = 0;
+  TSMBuffer hdr_buf = nullptr;
+  TSMLoc hdr_loc    = nullptr;
+  int header_size   = 0;
+
+  TSMutexLock(wasm->mutex());
+  // filling in variables for a successful fetch
+  if (event == static_cast<TSEvent>(FETCH_EVENT_ID_BASE)) {
+    int data_len;
+    const char *data_start = TSFetchRespGet(txn, &data_len);
+    if (data_start && (data_len > 0)) {
+      const char *data_end = data_start + data_len;
+      TSHttpParser parser  = TSHttpParserCreate();
+      hdr_buf              = TSMBufferCreate();
+      hdr_loc              = TSHttpHdrCreate(hdr_buf);
+      TSHttpHdrTypeSet(hdr_buf, hdr_loc, TS_HTTP_TYPE_RESPONSE);
+      if (TSHttpHdrParseResp(parser, hdr_buf, hdr_loc, &data_start, data_end) 
== TS_PARSE_DONE) {
+        TSHttpStatus status = TSHttpHdrStatusGet(hdr_buf, hdr_loc);
+        header_size         = TSMimeHdrFieldsCount(hdr_buf, hdr_loc);
+        body                = data_start; // data_start will now be pointing 
to body
+        body_size           = data_end - data_start;
+        TSDebug(WASM_DEBUG_TAG, "[%s] Fetch result had a status code of %d 
with a body length of %ld", __FUNCTION__, status,
+                body_size);
+      } else {
+        TSError("[wasm][%s] Unable to parse call response", __FUNCTION__);
+        event = static_cast<TSEvent>(FETCH_EVENT_ID_BASE + 1);
+      }
+      TSHttpParserDestroy(parser);
+    } else {
+      TSError("[wasm][%s] Successful fetch did not result in any content. 
Assuming failure", __FUNCTION__);
+      event = static_cast<TSEvent>(FETCH_EVENT_ID_BASE + 1);
+    }
+    result = event;
+  }
+
+  // callback function
+  TSDebug(WASM_DEBUG_TAG, "[%s] setting root context call result", 
__FUNCTION__);
+  root_context->setHttpCallResult(hdr_buf, hdr_loc, body, body_size, result);
+  TSDebug(WASM_DEBUG_TAG, "[%s] trigger root context function, token:  %d", 
__FUNCTION__, token);
+  root_context->onHttpCallResponse(token, header_size, body_size, 0);
+  TSDebug(WASM_DEBUG_TAG, "[%s] resetting root context call result", 
__FUNCTION__);
+  root_context->resetHttpCallResult();
+
+  // cleaning up
+  if (hdr_loc) {
+    TSMLoc null_parent_loc = nullptr;
+    TSHandleMLocRelease(hdr_buf, null_parent_loc, hdr_loc);
+  }
+  if (hdr_buf) {
+    TSMBufferDestroy(hdr_buf);
+  }
+
+  TSMutexUnlock(wasm->mutex());
+
+  TSDebug(WASM_DEBUG_TAG, "[%s] delete async info and continuation", 
__FUNCTION__);
+  // delete the Async Info
+  delete ai;
+  // delete continuation
+  TSContDestroy(cont);
+
+  return 0;
+}
+
 // utiltiy function for properties
 static void
 print_address(struct sockaddr const *ip, std::string *result)
@@ -221,11 +298,10 @@ Context::Context(Wasm *wasm) : ContextBase(wasm) {}
 Context::Context(Wasm *wasm, const std::shared_ptr<PluginBase> &plugin) : 
ContextBase(wasm, plugin) {}
 
 // NB: wasm can be nullptr if it failed to be created successfully.
-Context::Context(Wasm *wasm, uint32_t parent_context_id, const 
std::shared_ptr<PluginBase> &plugin) : ContextBase(wasm)
+Context::Context(Wasm *wasm, uint32_t parent_context_id, const 
std::shared_ptr<PluginBase> &plugin) : ContextBase(wasm, plugin)
 {
-  id_                = (wasm_ != nullptr) ? wasm_->allocContextId() : 0;
+  // setting up parent context
   parent_context_id_ = parent_context_id;
-  plugin_            = plugin;
   if (wasm_ != nullptr) {
     parent_context_ = wasm_->getContext(parent_context_id_);
   }
@@ -430,12 +506,16 @@ Context::getBuffer(WasmBufferType type)
     return buffer_.set(wasm_->vm_configuration());
   case WasmBufferType::PluginConfiguration:
     return buffer_.set(plugin_->plugin_configuration_);
+  case WasmBufferType::HttpCallResponseBody:
+    if (cr_body_ != nullptr) {
+      return buffer_.set(std::string(static_cast<const char *>(cr_body_), 
cr_body_size_));
+    }
+    return buffer_.set("");
   case WasmBufferType::CallData:
   case WasmBufferType::HttpRequestBody:
   case WasmBufferType::HttpResponseBody:
   case WasmBufferType::NetworkDownstreamData:
   case WasmBufferType::NetworkUpstreamData:
-  case WasmBufferType::HttpCallResponseBody:
   case WasmBufferType::GrpcReceiveBuffer:
   default:
     unimplemented();
@@ -443,6 +523,63 @@ Context::getBuffer(WasmBufferType type)
   }
 }
 
+WasmResult
+Context::httpCall(std::string_view target, const Pairs &request_headers, 
std::string_view request_body,
+                  const Pairs &request_trailers, int timeout_millisconds, 
uint32_t *token_ptr)
+{
+  Wasm *wasm            = this->wasm();
+  Context *root_context = this->root_context();
+
+  TSCont contp;
+  std::string request, method, path, authority;
+
+  // setup local address for API call
+  struct sockaddr_in addr;
+  addr.sin_family      = AF_INET;
+  addr.sin_addr.s_addr = LOCAL_IP_ADDRESS;
+  addr.sin_port        = LOCAL_PORT;
+
+  for (const auto &p : request_headers) {
+    std::string key(p.first);
+    std::string value(p.second);
+
+    if (key == ":method") {
+      method = value;
+    } else if (key == ":path") {
+      path = value;
+    } else if (key == ":authority") {
+      authority = value;
+    }
+  }
+
+  /* request */
+  request = method + " https://"; + authority + path + " HTTP/1.1\r\n";
+  for (const auto &p : request_headers) {
+    std::string key(p.first);
+    std::string value(p.second);
+    request += key + ": " + value + "\r\n";
+  }
+  request += "\r\n";
+  request += request_body;
+
+  TSFetchEvent event_ids;
+  event_ids.success_event_id = FETCH_EVENT_ID_BASE;
+  event_ids.failure_event_id = FETCH_EVENT_ID_BASE + 1;
+  event_ids.timeout_event_id = FETCH_EVENT_ID_BASE + 2;
+
+  contp            = TSContCreate(async_handler, TSMutexCreate());
+  AsyncInfo *ai    = new AsyncInfo();
+  ai->token        = wasm->nextHttpCallId();
+  ai->root_context = root_context;
+  *token_ptr       = ai->token; // to be returned to the caller
+  TSContDataSet(contp, ai);
+
+  // API call for async fetch
+  TSFetchUrl(request.c_str(), request.size(), reinterpret_cast<struct sockaddr 
const *>(&addr), contp, AFTER_BODY, event_ids);
+
+  return WasmResult::Ok;
+}
+
 // Metrics
 WasmResult
 Context::defineMetric(uint32_t metric_type, std::string_view name, uint32_t 
*metric_id_ptr)
@@ -1295,6 +1432,44 @@ Context::setProperty(std::string_view key, 
std::string_view serialized_value)
   return WasmResult::Ok;
 }
 
+WasmResult
+Context::continueStream(WasmStreamType /* stream_type */)
+{
+  if (reenable_txn_) {
+    TSError("[wasm][%s] transaction already reenabled", __FUNCTION__);
+    return WasmResult::Ok;
+  }
+
+  if (txnp_ == nullptr) {
+    TSError("[wasm][%s] Can't continue stream without a transaction", 
__FUNCTION__);
+    return WasmResult::InternalFailure;
+  } else {
+    TSDebug(WASM_DEBUG_TAG, "[%s] continuing txn for context %d", 
__FUNCTION__, id());
+    reenable_txn_ = true;
+    TSHttpTxnReenable(txnp_, TS_EVENT_HTTP_CONTINUE);
+    return WasmResult::Ok;
+  }
+}
+
+WasmResult
+Context::closeStream(WasmStreamType /* stream_type */)
+{
+  if (reenable_txn_) {
+    TSError("[wasm][%s] transaction already reenabled", __FUNCTION__);
+    return WasmResult::Ok;
+  }
+
+  if (txnp_ == nullptr) {
+    TSError("[wasm][%s] Can't continue stream without a transaction", 
__FUNCTION__);
+    return WasmResult::InternalFailure;
+  } else {
+    TSDebug(WASM_DEBUG_TAG, "[%s] continue txn for context %d with error", 
__FUNCTION__, id());
+    reenable_txn_ = true;
+    TSHttpTxnReenable(txnp_, TS_EVENT_HTTP_ERROR);
+    return WasmResult::Ok;
+  }
+}
+
 // send pre-made response
 WasmResult
 Context::sendLocalResponse(uint32_t response_code, std::string_view body_text, 
Pairs additional_headers,
@@ -1533,10 +1708,16 @@ Context::getHeaderMap(WasmHeaderMapType type)
     return {};
   case WasmHeaderMapType::ResponseTrailers:
     return {};
+  case WasmHeaderMapType::HttpCallResponseHeaders:
+    if (cr_hdr_buf_ == nullptr || cr_hdr_loc_ == nullptr) {
+      return {};
+    }
+    map.bufp    = cr_hdr_buf_;
+    map.hdr_loc = cr_hdr_loc_;
+    return map;
   default:
   case WasmHeaderMapType::GrpcReceiveTrailingMetadata:
   case WasmHeaderMapType::GrpcReceiveInitialMetadata:
-  case WasmHeaderMapType::HttpCallResponseHeaders:
   case WasmHeaderMapType::HttpCallResponseTrailers:
     return {};
   }
diff --git a/plugins/experimental/wasm/ats_context.h 
b/plugins/experimental/wasm/ats_context.h
index b516c79679..fdf18ddbe9 100644
--- a/plugins/experimental/wasm/ats_context.h
+++ b/plugins/experimental/wasm/ats_context.h
@@ -41,9 +41,14 @@
 
 namespace ats_wasm
 {
+const unsigned int LOCAL_IP_ADDRESS = 0x0100007f;
+const int LOCAL_PORT                = 8080;
+const int FETCH_EVENT_ID_BASE       = 10000;
+
 using proxy_wasm::ContextBase;
 using proxy_wasm::PluginBase;
 using proxy_wasm::WasmResult;
+using proxy_wasm::WasmStreamType;
 using proxy_wasm::BufferInterface;
 using proxy_wasm::BufferBase;
 using proxy_wasm::WasmHeaderMapType;
@@ -173,6 +178,30 @@ public:
 
   BufferInterface *getBuffer(WasmBufferType type) override;
 
+  WasmResult httpCall(std::string_view target, const Pairs &request_headers, 
std::string_view request_body,
+                      const Pairs &request_trailers, int timeout_millisconds, 
uint32_t *token_ptr) override;
+
+  // Call result functions
+  void
+  setHttpCallResult(TSMBuffer buf, TSMLoc loc, const void *body, size_t size, 
TSEvent result)
+  {
+    cr_hdr_buf_   = buf;
+    cr_hdr_loc_   = loc;
+    cr_body_      = body;
+    cr_body_size_ = size;
+    cr_result_    = result;
+  }
+
+  void
+  resetHttpCallResult()
+  {
+    cr_hdr_buf_   = nullptr;
+    cr_hdr_loc_   = nullptr;
+    cr_body_      = nullptr;
+    cr_body_size_ = 0;
+    cr_result_    = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+  }
+
   // Metrics
   WasmResult defineMetric(uint32_t metric_type, std::string_view name, 
uint32_t *metric_id_ptr) override;
   WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
@@ -185,9 +214,33 @@ public:
   WasmResult setProperty(std::string_view key, std::string_view 
serialized_value) override;
 
   // send a premade response
+  WasmResult continueStream(WasmStreamType stream_type) override;
+  WasmResult closeStream(WasmStreamType stream_type) override;
   WasmResult sendLocalResponse(uint32_t response_code, std::string_view 
body_text, Pairs additional_headers,
                                GrpcStatusCode /* grpc_status */, 
std::string_view details) override;
 
+  // check stream
+  bool
+  isTxnReenable()
+  {
+    return reenable_txn_;
+  }
+  void
+  setTxnReenable()
+  {
+    reenable_txn_ = true;
+  }
+  void
+  resetTxnReenable()
+  {
+    reenable_txn_ = false;
+  }
+  bool
+  isLocalReply()
+  {
+    return local_reply_;
+  }
+
   WasmResult getSharedData(std::string_view key, std::pair<std::string, 
uint32_t /* cas */> *data) override;
 
   // Header/Trailer/Metadata Maps
@@ -208,11 +261,29 @@ private:
   TSHttpTxn txnp_{nullptr};
   TSCont scheduler_cont_{nullptr};
 
+  // continue/close stream?
+  bool reenable_txn_ = false;
+
+  // local reply
   Pairs local_reply_headers_{};
   std::string local_reply_details_ = "";
   bool local_reply_                = false;
 
+  // buffer for result (don't set to null as default)
   BufferBase buffer_;
+
+  // Call result
+  TSEvent cr_result_    = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
+  const void *cr_body_  = nullptr;
+  size_t cr_body_size_  = 0;
+  TSMBuffer cr_hdr_buf_ = nullptr;
+  TSMLoc cr_hdr_loc_    = nullptr;
+};
+
+// local struct representing info for async transaction
+struct AsyncInfo {
+  uint32_t token;
+  Context *root_context;
 };
 
 } // namespace ats_wasm
diff --git a/plugins/experimental/wasm/examples/async/Makefile 
b/plugins/experimental/wasm/examples/async/Makefile
new file mode 100644
index 0000000000..a9a553e5fd
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/Makefile
@@ -0,0 +1,22 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+PROXY_WASM_CPP_SDK=/sdk
+
+all: async.wasm
+
+include ${PROXY_WASM_CPP_SDK}/Makefile.base_lite
diff --git a/plugins/experimental/wasm/examples/async/README.md 
b/plugins/experimental/wasm/examples/async/README.md
new file mode 100644
index 0000000000..7dbca88263
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/README.md
@@ -0,0 +1,13 @@
+CPP example for making asynchronous requests for ATS Wasm Plugin
+
+To compile `async.wasm`
+* Create a docker image following instructions here - 
https://github.com/proxy-wasm/proxy-wasm-cpp-sdk/blob/master/README.md#docker
+* In this directory run `docker run -v $PWD:/work -w /work  wasmsdk:v2 
/build_wasm.sh`
+
+Copy the yaml in this directory and the generated `async.wasm` to 
`/usr/local/var/wasm/` and activate the plugin in `plugin.config`
+* `wasm.so /usr/local/var/wasm/async.yaml`
+
+Make sure you have mapping rules in remap.config like below
+* `map https://www.google.com/ https://www.google.com/
+
+You can trigger the asynchronous requests with a request with User-Agent set 
to `test`
diff --git a/plugins/experimental/wasm/examples/async/async.cc 
b/plugins/experimental/wasm/examples/async/async.cc
new file mode 100644
index 0000000000..a5e3712929
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/async.cc
@@ -0,0 +1,85 @@
+/**
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include <string>
+#include <unordered_map>
+
+#include "proxy_wasm_intrinsics.h"
+
+class ExampleRootContext : public RootContext
+{
+public:
+  explicit ExampleRootContext(uint32_t id, std::string_view root_id) : 
RootContext(id, root_id) {}
+
+  bool onStart(size_t) override;
+};
+
+class ExampleContext : public Context
+{
+public:
+  explicit ExampleContext(uint32_t id, RootContext *root) : Context(id, root) 
{}
+
+  FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream) 
override;
+};
+static RegisterContextFactory 
register_ExampleContext(CONTEXT_FACTORY(ExampleContext), 
ROOT_FACTORY(ExampleRootContext),
+                                                      "myproject");
+
+bool
+ExampleRootContext::onStart(size_t)
+{
+  logInfo(std::string("onStart"));
+  return true;
+}
+
+FilterHeadersStatus
+ExampleContext::onRequestHeaders(uint32_t headers, bool end_of_stream)
+{
+  // print UA
+  auto ua = getRequestHeader("User-Agent");
+  logInfo(std::string("UA ") + std::string(ua->view()));
+
+  auto context_id = id();
+  auto callback   = [context_id](uint32_t, size_t body_size, uint32_t) {
+    logInfo("async call done");
+    if (body_size == 0) {
+      logInfo("async_call failed");
+      return;
+    }
+    auto response_headers = 
getHeaderMapPairs(WasmHeaderMapType::HttpCallResponseHeaders);
+    auto body             = 
getBufferBytes(WasmBufferType::HttpCallResponseBody, 0, body_size);
+    for (auto &p : response_headers->pairs()) {
+      logInfo(std::string(p.first) + std::string(" -> ") + 
std::string(p.second));
+    }
+    logInfo(std::string(body->view()));
+
+    getContext(context_id)->setEffectiveContext();
+    logInfo("continueRequest");
+    continueRequest();
+  };
+  std::string test = std::string(ua->view());
+  if (test == "test") {
+    root()->httpCall("cluster",
+                     {
+                       {":method",    "GET"                      },
+                       {":path",      "/.well-known/security.txt"},
+                       {":authority", "www.google.com"           }
+    },
+                     "", {}, 10000, callback);
+    return FilterHeadersStatus::StopIteration;
+  }
+
+  return FilterHeadersStatus::Continue;
+}
diff --git a/plugins/experimental/wasm/examples/async/async.yaml 
b/plugins/experimental/wasm/examples/async/async.yaml
new file mode 100644
index 0000000000..e97eb3f398
--- /dev/null
+++ b/plugins/experimental/wasm/examples/async/async.yaml
@@ -0,0 +1,24 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+config:
+  name: test
+  rootId: myproject
+  vmConfig:
+    code:
+      local:
+        filename: /usr/local/var/wasm/async.wasm
+    runtime: ats.wasm.runtime.wamr
+    vmId: test
diff --git a/plugins/experimental/wasm/wasm_main.cc 
b/plugins/experimental/wasm/wasm_main.cc
index 5267914db5..4a52268ab0 100644
--- a/plugins/experimental/wasm/wasm_main.cc
+++ b/plugins/experimental/wasm/wasm_main.cc
@@ -127,6 +127,9 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
   int result     = -1;
   auto *context  = static_cast<ats_wasm::Context *>(TSContDataGet(contp));
   auto *old_wasm = static_cast<ats_wasm::Wasm *>(context->wasm());
+
+  context->resetTxnReenable();
+
   TSMutexLock(old_wasm->mutex());
   std::shared_ptr<ats_wasm::Wasm> temp = nullptr;
   auto *txnp                           = static_cast<TSHttpTxn>(data);
@@ -144,6 +147,7 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
       TSError("[wasm][%s] cannot retrieve client request", __FUNCTION__);
       TSMutexUnlock(old_wasm->mutex());
       TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+      context->setTxnReenable();
       return 0;
     }
     count = TSMimeHdrFieldsCount(buf, hdr_loc);
@@ -166,6 +170,7 @@ http_event_handler(TSCont contp, TSEvent event, void *data)
       TSError("[wasm][%s] cannot retrieve server response", __FUNCTION__);
       TSMutexUnlock(old_wasm->mutex());
       TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+      context->setTxnReenable();
       return 0;
     }
     count = TSMimeHdrFieldsCount(buf, hdr_loc);
@@ -243,15 +248,34 @@ http_event_handler(TSCont contp, TSEvent event, void 
*data)
 
   TSMutexUnlock(old_wasm->mutex());
 
-  if (result == 0) {
-    TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
-  } else if (result < 0) {
-    TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+  // check if we have reenable transaction already or not
+  if ((context == nullptr) || (!context->isTxnReenable())) {
+    TSDebug(WASM_DEBUG_TAG, "[%s] no context or not yet reenabled 
transaction", __FUNCTION__);
+
+    if (result == 0) {
+      TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+      if (context != nullptr) {
+        context->setTxnReenable();
+      }
+    } else if (result < 0) {
+      TSDebug(WASM_DEBUG_TAG, "[%s] abnormal event, continue with error", 
__FUNCTION__);
+      TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+      if (context != nullptr) {
+        context->setTxnReenable();
+      }
+    } else {
+      if (context->isLocalReply()) {
+        TSDebug(WASM_DEBUG_TAG, "[%s] abnormal return, continue with error due 
to local reply", __FUNCTION__);
+        TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+        if (context != nullptr) {
+          context->setTxnReenable();
+        }
+      } else {
+        TSDebug(WASM_DEBUG_TAG, "[%s] abnormal return, no continue, context 
id: %d", __FUNCTION__, context->id());
+      }
+    }
   } else {
-    // TODO: wait for async operation
-    // Temporarily resume with error
-    TSDebug(WASM_DEBUG_TAG, "[%s] result > 0, continue with error for now", 
__FUNCTION__);
-    TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+    TSDebug(WASM_DEBUG_TAG, "[%s] transaction already reenabled", 
__FUNCTION__);
   }
   return 0;
 }

Reply via email to