This is an automated email from the ASF dual-hosted git repository.
kichan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 5992676e6a Add support for request/response transform for Wasm plugin
(#10004)
5992676e6a is described below
commit 5992676e6a3902052c8166c45b40cc123bacfb2f
Author: Kit Chan <[email protected]>
AuthorDate: Fri Jul 28 11:41:07 2023 -0700
Add support for request/response transform for Wasm plugin (#10004)
* Add support for request/response transform
* fix clang format error
* fix clang format error
* fix clang format error
* use bool and fix a redundant statement
* fix transform_handler() signature and add comments on EOS handling
---
doc/admin-guide/plugins/wasm.en.rst | 1 -
plugins/experimental/wasm/ats_context.cc | 18 +-
plugins/experimental/wasm/ats_context.h | 99 ++++++++
.../experimental/wasm/examples/transform/Makefile | 22 ++
.../experimental/wasm/examples/transform/README.md | 9 +
.../wasm/examples/transform/transform.cc | 70 ++++++
.../wasm/examples/transform/transform.yaml | 24 ++
plugins/experimental/wasm/wasm_main.cc | 263 +++++++++++++++++++++
8 files changed, 504 insertions(+), 2 deletions(-)
diff --git a/doc/admin-guide/plugins/wasm.en.rst
b/doc/admin-guide/plugins/wasm.en.rst
index 642c49fb83..cf92af6157 100644
--- a/doc/admin-guide/plugins/wasm.en.rst
+++ b/doc/admin-guide/plugins/wasm.en.rst
@@ -103,7 +103,6 @@ TODO
====
* Currently only the WAMR and WasmEdge runtime is supported. We should also
support V8 and Wasmtime later.
-* Need to support functionality for retrieving and setting request/response
body
* Need to support L4 lifecycle handler functions
Limitations
diff --git a/plugins/experimental/wasm/ats_context.cc
b/plugins/experimental/wasm/ats_context.cc
index b3de641091..63a44e0f7c 100644
--- a/plugins/experimental/wasm/ats_context.cc
+++ b/plugins/experimental/wasm/ats_context.cc
@@ -291,6 +291,20 @@ set_header(TSMBuffer bufp, TSMLoc hdr_loc,
std::string_view v, std::string_view
}
}
+// Buffer copyTo
+WasmResult
+Buffer::copyTo(WasmBase *wasm, size_t start, size_t length, uint64_t ptr_ptr,
uint64_t size_ptr) const
+{
+ if (owned_data_str_ != "") {
+ std::string_view s(owned_data_str_);
+ if (!wasm->copyToPointerSize(s, ptr_ptr, size_ptr)) {
+ return WasmResult::InvalidMemoryAccess;
+ }
+ return WasmResult::Ok;
+ }
+ return BufferBase::copyTo(wasm, start, length, ptr_ptr, size_ptr);
+}
+
Context::Context() : ContextBase() {}
Context::Context(Wasm *wasm) : ContextBase(wasm) {}
@@ -511,9 +525,11 @@ Context::getBuffer(WasmBufferType type)
return buffer_.set(std::string(static_cast<const char *>(cr_body_),
cr_body_size_));
}
return buffer_.set("");
- case WasmBufferType::CallData:
case WasmBufferType::HttpRequestBody:
case WasmBufferType::HttpResponseBody:
+ // return transform result
+ return &transform_result_;
+ case WasmBufferType::CallData:
case WasmBufferType::NetworkDownstreamData:
case WasmBufferType::NetworkUpstreamData:
case WasmBufferType::GrpcReceiveBuffer:
diff --git a/plugins/experimental/wasm/ats_context.h
b/plugins/experimental/wasm/ats_context.h
index fdf18ddbe9..96a8a7b9c5 100644
--- a/plugins/experimental/wasm/ats_context.h
+++ b/plugins/experimental/wasm/ats_context.h
@@ -47,6 +47,7 @@ const int FETCH_EVENT_ID_BASE = 10000;
using proxy_wasm::ContextBase;
using proxy_wasm::PluginBase;
+using proxy_wasm::WasmBase;
using proxy_wasm::WasmResult;
using proxy_wasm::WasmStreamType;
using proxy_wasm::BufferInterface;
@@ -138,6 +139,55 @@ struct HeaderMap {
}
};
+// extended BufferBase
+class Buffer : public BufferBase
+{
+public:
+ Buffer() { owned_data_str_ = ""; }
+ ~Buffer() override = default;
+
+ size_t
+ size() const override
+ {
+ if (owned_data_str_ != "") {
+ return owned_data_str_.size();
+ }
+ return BufferBase::size();
+ }
+
+ WasmResult copyTo(WasmBase *wasm, size_t start, size_t length, uint64_t
ptr_ptr, uint64_t size_ptr) const override;
+
+ WasmResult
+ copyFrom(size_t start, size_t length, std::string_view data) override
+ {
+ owned_data_str_.replace(start, length, data);
+ return WasmResult::Ok;
+ }
+
+ void
+ clear() override
+ {
+ owned_data_str_ = "";
+ BufferBase::clear();
+ }
+
+ BufferBase *
+ set(std::string data)
+ {
+ owned_data_str_ = owned_data_str_ + data;
+ return this;
+ }
+
+ std::string
+ get()
+ {
+ return owned_data_str_;
+ }
+
+private:
+ std::string owned_data_str_;
+};
+
class Context : public ContextBase
{
public:
@@ -202,6 +252,33 @@ public:
cr_result_ = (TSEvent)(FETCH_EVENT_ID_BASE + 1);
}
+ // transform result functions
+ void
+ clearTransformResult()
+ {
+ transform_result_.clear();
+ }
+
+ void
+ setTransformResult(const char *body, size_t body_size)
+ {
+ if (body == nullptr || body_size == 0) {
+ std::string s("");
+ transform_result_.set(s);
+ } else {
+ std::string s(body, body_size);
+ transform_result_.set(s);
+ }
+ }
+
+ const char *
+ getTransformResult(size_t *body_size)
+ {
+ std::string s = transform_result_.get();
+ *body_size = s.size();
+ return s.c_str();
+ }
+
// Metrics
WasmResult defineMetric(uint32_t metric_type, std::string_view name,
uint32_t *metric_id_ptr) override;
WasmResult incrementMetric(uint32_t metric_id, int64_t offset) override;
@@ -278,6 +355,9 @@ private:
size_t cr_body_size_ = 0;
TSMBuffer cr_hdr_buf_ = nullptr;
TSMLoc cr_hdr_loc_ = nullptr;
+
+ // transform result
+ Buffer transform_result_;
};
// local struct representing info for async transaction
@@ -286,4 +366,23 @@ struct AsyncInfo {
Context *root_context;
};
+// local struct representing info for transform
+struct TransformInfo {
+ TSVIO output_vio;
+ TSIOBuffer output_buffer;
+ TSIOBufferReader output_reader;
+
+ TSVIO reserved_vio;
+ TSIOBuffer reserved_buffer;
+ TSIOBufferReader reserved_reader;
+
+ int64_t upstream_bytes;
+ int64_t downstream_bytes;
+ int64_t total;
+
+ Context *context;
+
+ bool request;
+};
+
} // namespace ats_wasm
diff --git a/plugins/experimental/wasm/examples/transform/Makefile
b/plugins/experimental/wasm/examples/transform/Makefile
new file mode 100644
index 0000000000..2c3d6f6013
--- /dev/null
+++ b/plugins/experimental/wasm/examples/transform/Makefile
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+PROXY_WASM_CPP_SDK=/sdk
+
+all: transform.wasm
+
+include ${PROXY_WASM_CPP_SDK}/Makefile.base_lite
diff --git a/plugins/experimental/wasm/examples/transform/README.md
b/plugins/experimental/wasm/examples/transform/README.md
new file mode 100644
index 0000000000..db01a38669
--- /dev/null
+++ b/plugins/experimental/wasm/examples/transform/README.md
@@ -0,0 +1,9 @@
+CPP example for handling request/response transform for ATS Wasm Plugin
+
+To compile `transform.wasm`
+* Create a docker image following instructions here -
https://github.com/proxy-wasm/proxy-wasm-cpp-sdk/blob/master/README.md#docker
+* In this directory run `docker run -v $PWD:/work -w /work wasmsdk:v2
/build_wasm.sh`
+
+Copy the yaml in this directory and the generated `transform.wasm` to
`/usr/local/var/wasm/` and activate the plugin in `plugin.config`
+* `wasm.so /usr/local/var/wasm/transform.yaml`
+
diff --git a/plugins/experimental/wasm/examples/transform/transform.cc
b/plugins/experimental/wasm/examples/transform/transform.cc
new file mode 100644
index 0000000000..ffb537404f
--- /dev/null
+++ b/plugins/experimental/wasm/examples/transform/transform.cc
@@ -0,0 +1,70 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include <string>
+#include <unordered_map>
+
+#include "proxy_wasm_intrinsics.h"
+
+class ExampleRootContext : public RootContext
+{
+public:
+ explicit ExampleRootContext(uint32_t id, std::string_view root_id) :
RootContext(id, root_id) {}
+
+ bool onStart(size_t) override;
+};
+
+class ExampleContext : public Context
+{
+public:
+ explicit ExampleContext(uint32_t id, RootContext *root) : Context(id, root)
{}
+
+ FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream)
override;
+ FilterDataStatus onResponseBody(size_t body_buffer_length, bool
end_of_stream) override;
+};
+static RegisterContextFactory
register_ExampleContext(CONTEXT_FACTORY(ExampleContext),
ROOT_FACTORY(ExampleRootContext),
+ "myproject");
+
+bool
+ExampleRootContext::onStart(size_t)
+{
+ logInfo(std::string("onStart"));
+ return true;
+}
+
+FilterHeadersStatus
+ExampleContext::onRequestHeaders(uint32_t headers, bool end_of_stream)
+{
+ // print UA
+ auto ua = getRequestHeader("User-Agent");
+ logInfo(std::string("UA ") + std::string(ua->view()));
+
+ return FilterHeadersStatus::Continue;
+}
+
+FilterDataStatus
+ExampleContext::onResponseBody(size_t body_buffer_length, bool end_of_stream)
+{
+ logInfo(std::string("inside onResponseBody"));
+
+ size_t buffered_size;
+ uint32_t flags;
+ getBufferStatus(WasmBufferType::HttpResponseBody, &buffered_size, &flags);
+ auto body = getBufferBytes(WasmBufferType::HttpResponseBody, 0,
buffered_size);
+ logError(std::string("onBody ") + std::string(body->view()));
+
+ return FilterDataStatus::Continue;
+}
diff --git a/plugins/experimental/wasm/examples/transform/transform.yaml
b/plugins/experimental/wasm/examples/transform/transform.yaml
new file mode 100644
index 0000000000..2069bd1fbc
--- /dev/null
+++ b/plugins/experimental/wasm/examples/transform/transform.yaml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+config:
+ name: test
+ rootId: myproject
+ vmConfig:
+ code:
+ local:
+ filename: /usr/local/var/wasm/transform.wasm
+ runtime: ats.wasm.runtime.wamr
+ vmId: test
diff --git a/plugins/experimental/wasm/wasm_main.cc
b/plugins/experimental/wasm/wasm_main.cc
index 4a52268ab0..ddcba2b7bb 100644
--- a/plugins/experimental/wasm/wasm_main.cc
+++ b/plugins/experimental/wasm/wasm_main.cc
@@ -46,6 +46,252 @@ struct WasmInstanceConfig {
static std::unique_ptr<WasmInstanceConfig> wasm_config = nullptr;
+// handler for transform event
+static int
+transform_handler(TSCont contp, ats_wasm::TransformInfo *ti)
+{
+ TSVConn output_conn;
+ TSVIO input_vio;
+ TSIOBufferReader input_reader;
+ TSIOBufferBlock blk;
+ int64_t toread, towrite, blk_len, upstream_done, input_avail;
+ const char *start;
+ const char *res;
+ size_t res_len;
+ bool eos, write_down, empty_input;
+
+ ats_wasm::Context *c;
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] transform handler begins", __FUNCTION__);
+ c = ti->context;
+
+ output_conn = TSTransformOutputVConnGet(contp);
+ input_vio = TSVConnWriteVIOGet(contp);
+
+ empty_input = false;
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] cheking input VIO", __FUNCTION__);
+ if (!TSVIOBufferGet(input_vio)) {
+ if (ti->output_vio) {
+ TSDebug(WASM_DEBUG_TAG, "[%s] reenabling output VIO after input VIO does
not exist", __FUNCTION__);
+ TSVIONBytesSet(ti->output_vio, ti->total);
+ TSVIOReenable(ti->output_vio);
+ return 0;
+ } else {
+ TSDebug(WASM_DEBUG_TAG, "[%s] no input VIO and output VIO",
__FUNCTION__);
+ empty_input = true;
+ }
+ }
+
+ if (!empty_input) {
+ input_reader = TSVIOReaderGet(input_vio);
+ }
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] creating buffer and reader", __FUNCTION__);
+ if (!ti->output_buffer) {
+ ti->output_buffer = TSIOBufferCreate();
+ ti->output_reader = TSIOBufferReaderAlloc(ti->output_buffer);
+
+ ti->reserved_buffer = TSIOBufferCreate();
+ ti->reserved_reader = TSIOBufferReaderAlloc(ti->reserved_buffer);
+
+ if (!empty_input) {
+ ti->upstream_bytes = TSVIONBytesGet(input_vio);
+ } else {
+ ti->upstream_bytes = 0;
+ }
+
+ ti->downstream_bytes = INT64_MAX;
+ }
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] init variables inside handler", __FUNCTION__);
+ if (!empty_input) {
+ input_avail = TSIOBufferReaderAvail(input_reader);
+ upstream_done = TSVIONDoneGet(input_vio);
+ toread = TSVIONTodoGet(input_vio);
+
+ if (toread <= input_avail) { // upstream finished
+ eos = true;
+ } else {
+ eos = false;
+ }
+ } else {
+ input_avail = 0;
+ upstream_done = 0;
+ toread = 0;
+ eos = true;
+ }
+
+ if (input_avail > 0) {
+ // move to the reserved.buffer
+ TSIOBufferCopy(ti->reserved_buffer, input_reader, input_avail, 0);
+
+ // reset input
+ TSIOBufferReaderConsume(input_reader, input_avail);
+ TSVIONDoneSet(input_vio, upstream_done + input_avail);
+ }
+
+ write_down = false;
+ if (!empty_input) {
+ towrite = TSIOBufferReaderAvail(ti->reserved_reader);
+ } else {
+ towrite = 0;
+ }
+
+ do {
+ TSDebug(WASM_DEBUG_TAG, "[%s] inside transform handler loop",
__FUNCTION__);
+ proxy_wasm::FilterDataStatus status =
proxy_wasm::FilterDataStatus::Continue;
+
+ if (towrite == 0 && !empty_input) {
+ break;
+ }
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] retrieving text and calling the wasm handler
function", __FUNCTION__);
+ if (!empty_input) {
+ blk = TSIOBufferReaderStart(ti->reserved_reader);
+ start = TSIOBufferBlockReadStart(blk, ti->reserved_reader, &blk_len);
+
+ int size = 0;
+ if (towrite > blk_len) {
+ c->setTransformResult(start, blk_len);
+ towrite -= blk_len;
+ TSIOBufferReaderConsume(ti->reserved_reader, blk_len);
+ size = blk_len;
+ } else {
+ c->setTransformResult(start, towrite);
+ TSIOBufferReaderConsume(ti->reserved_reader, towrite);
+ size = towrite;
+ towrite = 0;
+ }
+
+ if (!towrite && eos) {
+ if (ti->request) {
+ status = c->onRequestBody(size, true);
+ } else {
+ status = c->onResponseBody(size, true);
+ }
+ } else {
+ if (ti->request) {
+ status = c->onRequestBody(size, false);
+ } else {
+ status = c->onResponseBody(size, false);
+ }
+ }
+ } else {
+ c->setTransformResult(nullptr, 0);
+ if (ti->request) {
+ status = c->onRequestBody(0, true);
+ } else {
+ status = c->onResponseBody(0, true);
+ }
+ }
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] retrieving returns from wasm handler
function and pass back to ATS", __FUNCTION__);
+ if ((status == proxy_wasm::FilterDataStatus::Continue) ||
+ ((status == proxy_wasm::FilterDataStatus::StopIterationAndBuffer ||
+ status == proxy_wasm::FilterDataStatus::StopIterationAndWatermark) &&
+ eos && !towrite)) {
+ res = c->getTransformResult(&res_len);
+
+ if (res && res_len > 0) {
+ if (!ti->output_vio) {
+ if (eos && !towrite) {
+ ti->output_vio = TSVConnWrite(output_conn, contp,
ti->output_reader, res_len); // HttpSM go on
+ } else {
+ ti->output_vio = TSVConnWrite(output_conn, contp,
ti->output_reader, ti->downstream_bytes); // HttpSM go on
+ }
+ }
+
+ TSIOBufferWrite(ti->output_buffer, res, res_len);
+ ti->total += res_len;
+ write_down = true;
+ }
+
+ c->clearTransformResult();
+ }
+
+ if (status == proxy_wasm::FilterDataStatus::StopIterationNoBuffer) {
+ c->clearTransformResult();
+ }
+
+ if (eos && !towrite) { // EOS
+ break;
+ }
+
+ } while (towrite > 0);
+
+ if (eos && !ti->output_vio) {
+ ti->output_vio = TSVConnWrite(output_conn, contp, ti->output_reader, 0);
+ }
+
+ if (write_down || eos) {
+ TSVIOReenable(ti->output_vio);
+ }
+
+ if (toread > input_avail) { // upstream not finished.
+ if (eos) {
+ // this should not happen because eos is set to true if toread <=
input_avail
+ // we are, though, expecting that eos may be set by the wasm module
function in the future
+ TSVIONBytesSet(ti->output_vio, ti->total);
+ if (!empty_input) {
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_EOS, input_vio);
+ }
+ } else {
+ if (!empty_input) {
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY,
input_vio);
+ }
+ }
+ } else { // upstream is finished.
+ TSVIONBytesSet(ti->output_vio, ti->total);
+ if (!empty_input) {
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE,
input_vio);
+ }
+ }
+
+ return 0;
+}
+
+static int
+transform_entry(TSCont contp, TSEvent ev, void *edata)
+{
+ int event;
+ TSVIO input_vio;
+ ats_wasm::TransformInfo *ti;
+
+ event = (int)ev;
+ ti = (ats_wasm::TransformInfo *)TSContDataGet(contp);
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] begin transform entry", __FUNCTION__);
+ if (TSVConnClosedGet(contp)) {
+ delete ti;
+ TSContDestroy(contp);
+ return 0;
+ }
+
+ TSDebug(WASM_DEBUG_TAG, "[%s] checking event inside transform entry",
__FUNCTION__);
+ switch (event) {
+ case TS_EVENT_ERROR:
+ TSDebug(WASM_DEBUG_TAG, "[%s] event error", __FUNCTION__);
+ input_vio = TSVConnWriteVIOGet(contp);
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_ERROR, input_vio);
+ break;
+
+ // we should handle TS_EVENT_VCONN_EOS similarly here if we support setting
EOS from wasm module
+ case TS_EVENT_VCONN_WRITE_COMPLETE:
+ TSDebug(WASM_DEBUG_TAG, "[%s] event vconn write complete", __FUNCTION__);
+ TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
+ break;
+
+ case TS_EVENT_VCONN_WRITE_READY:
+ default:
+ TSDebug(WASM_DEBUG_TAG, "[%s] event vconn write ready/default",
__FUNCTION__);
+ transform_handler(contp, ti);
+ break;
+ }
+
+ return 0;
+}
+
// handler for timer event
static int
schedule_handler(TSCont contp, TSEvent /*event*/, void * /*data*/)
@@ -305,6 +551,23 @@ global_hook_handler(TSCont /*contp*/, TSEvent /*event*/,
void *data)
TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, txn_contp);
TSContDataSet(txn_contp, context);
+
+ // create transform items
+ TSDebug(WASM_DEBUG_TAG, "[%s] creating transform info, continuation and
hook", __FUNCTION__);
+ ats_wasm::TransformInfo *reqbody_ti = new ats_wasm::TransformInfo();
+ reqbody_ti->request = true;
+ reqbody_ti->context = context;
+ ats_wasm::TransformInfo *respbody_ti = new ats_wasm::TransformInfo();
+ respbody_ti->request = false;
+ respbody_ti->context = context;
+
+ TSVConn reqbody_connp = TSTransformCreate(transform_entry, txnp);
+ TSContDataSet(reqbody_connp, reqbody_ti);
+ TSVConn respbody_connp = TSTransformCreate(transform_entry, txnp);
+ TSContDataSet(respbody_connp, respbody_ti);
+
+ TSHttpTxnHookAdd(txnp, TS_HTTP_REQUEST_TRANSFORM_HOOK, reqbody_connp);
+ TSHttpTxnHookAdd(txnp, TS_HTTP_RESPONSE_TRANSFORM_HOOK, respbody_connp);
}
TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);