Repository: trafficserver Updated Branches: refs/heads/oschaaf-ipro-flow [created] 006c91e9e
Add support for In-Place-Resource-Optimization/OptimizeForBandwith Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/006c91e9 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/006c91e9 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/006c91e9 Branch: refs/heads/oschaaf-ipro-flow Commit: 006c91e9e05c2e25f9dc1a384a61f06e322bb3ff Parents: 7bb2070 Author: Otto van der Schaaf <[email protected]> Authored: Thu Aug 14 17:44:22 2014 +0200 Committer: Otto van der Schaaf <[email protected]> Committed: Thu Aug 14 17:44:22 2014 +0200 ---------------------------------------------------------------------- .../experimental/ats_speed/ats_base_fetch.cc | 92 +++++++- plugins/experimental/ats_speed/ats_base_fetch.h | 12 +- .../ats_speed/ats_resource_intercept.cc | 42 ++-- .../ats_speed/ats_resource_intercept.h | 40 ++++ plugins/experimental/ats_speed/ats_speed.cc | 225 +++++++++++++++++-- plugins/experimental/ats_speed/ats_speed.h | 9 + 6 files changed, 363 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_base_fetch.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_base_fetch.cc b/plugins/experimental/ats_speed/ats_base_fetch.cc index 769e79b..e6bd94a 100644 --- a/plugins/experimental/ats_speed/ats_base_fetch.cc +++ b/plugins/experimental/ats_speed/ats_base_fetch.cc @@ -26,7 +26,9 @@ #include <ts/ts.h> #include "ats_server_context.h" +#include "ats_resource_intercept.h" +#include "net/instaweb/http/public/cache_url_async_fetcher.h" #include "net/instaweb/util/public/string_util.h" #include "net/instaweb/util/public/string_writer.h" #include "net/instaweb/util/public/google_message_handler.h" @@ -43,11 +45,17 @@ AtsBaseFetch::AtsBaseFetch(AtsServerContext* server_context, done_called_(false), last_buf_sent_(false), references_(2), + // downstream_vio is NULL for the IPRO lookup downstream_vio_(downstream_vio), downstream_buffer_(downstream_buffer), is_resource_fetch_(is_resource_fetch), downstream_length_(0), - txn_mutex_(TSVIOMutexGet(downstream_vio)) { + txn_mutex_(downstream_vio ? TSVIOMutexGet(downstream_vio) : NULL), + // TODO(oschaaf): check and use handle_error_. + handle_error_(false), + is_ipro_(false), + ctx_(NULL), + ipro_callback_(NULL) { buffer_.reserve(1024 * 32); } @@ -85,7 +93,7 @@ void AtsBaseFetch::HandleHeadersComplete() { // This implies that we can't support convert_meta_tags TSDebug("ats-speed", "HeadersComplete()!"); // For resource fetches, we need to output the headers in raw HTTP format. - if (is_resource_fetch_) { + if (is_resource_fetch_ || is_ipro_) { GoogleMessageHandler mh; GoogleString s; StringWriter string_writer(&s); @@ -96,6 +104,11 @@ void AtsBaseFetch::HandleHeadersComplete() { } void AtsBaseFetch::ForwardData(const StringPiece& sp, bool reenable, bool last) { + if (is_ipro_) { + TSDebug("ats-speed", "ipro forwarddata: %.*s", (int)sp.size(), sp.data()); + buffer_.append(sp.data(), sp.size()); + return; + } TSIOBufferBlock downstream_blkp; char *downstream_buffer; int64_t downstream_length; @@ -123,15 +136,84 @@ void AtsBaseFetch::ForwardData(const StringPiece& sp, bool reenable, bool last) Unlock(); } -void AtsBaseFetch::HandleDone(bool success) { +void AtsBaseFetch::HandleDone(bool success) { + // When this is an IPRO lookup: + // if we've got a 200 result, store the data and setup an intercept + // to write it out. + // Regardless, re-enable the transaction at this point. + + //TODO(oschaaf): what about no success? + if (is_ipro_) { + TSDebug("ats-speed", "ipro lookup base fetch done()"); + done_called_ = true; + + int status_code = response_headers()->status_code(); + bool status_ok = (status_code != 0) && (status_code < 400); + if (status_code == CacheUrlAsyncFetcher::kNotInCacheStatus) { + TSDebug("ats-speed", "ipro lookup base fetch -> not found in cache"); + ctx_->record_in_place = true; + TSHttpTxnReenable(ctx_->txn, TS_EVENT_HTTP_CONTINUE); + ctx_ = NULL; + DecrefAndDeleteIfUnreferenced(); + return; + } else if (!status_ok) { + TSDebug("ats-speed", "ipro lookup base fetch -> ipro cache entry says not applicable"); + TSHttpTxnReenable(ctx_->txn, TS_EVENT_HTTP_CONTINUE); + ctx_ = NULL; + DecrefAndDeleteIfUnreferenced(); + return; + } + ctx_->serve_in_place = true; + TransformCtx* ctx = ctx_; + TSHttpTxn txn = ctx_->txn; + // TODO(oschaaf): deduplicate with code that hooks the resource intercept + TSHttpTxnRespCacheableSet(txn, 0); + TSHttpTxnReqCacheableSet(txn, 0); + + TSMBuffer reqp; + TSMLoc req_hdr_loc; + if (TSHttpTxnClientReqGet(ctx->txn, &reqp, &req_hdr_loc) != TS_SUCCESS) { + TSError("Error TSHttpTxnClientReqGet for resource!"); + ctx_ = NULL; + DecrefAndDeleteIfUnreferenced(); + TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); + return; + } + + TSCont interceptCont = TSContCreate((int (*)(tsapi_cont*, TSEvent, void*))ipro_callback_, TSMutexCreate()); + InterceptCtx *intercept_ctx = new InterceptCtx(); + intercept_ctx->request_ctx = ctx; + intercept_ctx->request_headers = new RequestHeaders(); + intercept_ctx->response->append(buffer_); + copy_request_headers_to_psol(reqp, req_hdr_loc, intercept_ctx->request_headers); + TSHandleMLocRelease(reqp, TS_NULL_MLOC, req_hdr_loc); + + + TSContDataSet(interceptCont, intercept_ctx); + // TODO(oschaaf): when we serve an IPRO optimized asset, that will be handled + // by the resource intercept. Which means we should set TXN_INDEX_OWNED_ARG to + // unset (the intercept now own the context. + TSHttpTxnServerIntercept(interceptCont, txn); + // TODO(oschaaf): I don't think we need to lock here, but double check that + // to make sure. + ctx_->base_fetch = NULL; + ctx_ = NULL; + DecrefAndDeleteIfUnreferenced(); + TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); + return; + } + + + TSDebug("ats-speed", "Done()!"); CHECK(!done_called_); CHECK(downstream_vio_); - TSDebug("ats-speed", "Done()!"); - Lock(); done_called_ = true; ForwardData("", true, true); + DecrefAndDeleteIfUnreferenced(); + // TODO(oschaaf): we aren't safe to touch the associated mutex, + // right? FIX. Unlock(); } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_base_fetch.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_base_fetch.h b/plugins/experimental/ats_speed/ats_base_fetch.h index 8e4d93d..b034f64 100644 --- a/plugins/experimental/ats_speed/ats_base_fetch.h +++ b/plugins/experimental/ats_speed/ats_base_fetch.h @@ -56,7 +56,11 @@ public: virtual ~AtsBaseFetch(); void Release(); -private: + void set_handle_error(bool x) { handle_error_ = x; } + void set_is_ipro(bool x) { is_ipro_ = x; } + void set_ctx(TransformCtx* x) { ctx_ = x; } + void set_ipro_callback(void* fp) { ipro_callback_ = fp; } + private: virtual bool HandleWrite(const StringPiece& sp, net_instaweb::MessageHandler* handler); virtual bool HandleFlush( net_instaweb::MessageHandler* handler); virtual void HandleHeadersComplete(); @@ -80,6 +84,12 @@ private: // We don't own this mutex TSMutex txn_mutex_; + bool handle_error_; + bool is_ipro_; + // will be used by ipro to reenable the transaction on lookup completion + TransformCtx* ctx_; + // function pointer to ipro transform callback + void* ipro_callback_; }; } /* ats_pagespeed */ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_resource_intercept.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_resource_intercept.cc b/plugins/experimental/ats_speed/ats_resource_intercept.cc index 0afeae5..505f2cc 100644 --- a/plugins/experimental/ats_speed/ats_resource_intercept.cc +++ b/plugins/experimental/ats_speed/ats_resource_intercept.cc @@ -36,6 +36,7 @@ #include "net/instaweb/http/public/request_context.h" #include "net/instaweb/rewriter/public/resource_fetch.h" +#include "net/instaweb/rewriter/public/rewrite_driver.h" #include "net/instaweb/rewriter/public/static_asset_manager.h" #include "net/instaweb/system/public/system_request_context.h" @@ -44,30 +45,6 @@ using namespace net_instaweb; -struct InterceptCtx -{ - TSVConn vconn; - TSIOBuffer req_buffer; - TSIOBufferReader req_reader; - TSIOBuffer resp_buffer; - TSIOBufferReader resp_reader; - GoogleString* response; - TransformCtx* request_ctx; - RequestHeaders* request_headers; - - InterceptCtx() - : vconn(NULL) - , req_buffer(NULL) - , req_reader(NULL) - , resp_buffer(NULL) - , resp_reader(NULL) - , response( new GoogleString() ) - , request_ctx(NULL) - , request_headers(NULL) - { - }; -}; - static void shutdown (TSCont cont, InterceptCtx * intercept_ctx) { if (intercept_ctx->req_reader != NULL) { @@ -111,6 +88,7 @@ shutdown (TSCont cont, InterceptCtx * intercept_ctx) { static int resource_intercept(TSCont cont, TSEvent event, void *edata) { + TSDebug("ats-speed", "resource_intercept event: %d", (int)event); InterceptCtx *intercept_ctx = static_cast<InterceptCtx *>(TSContDataGet(cont)); bool shutDown = false; @@ -156,7 +134,7 @@ resource_intercept(TSCont cont, TSEvent event, void *edata) intercept_ctx->request_headers); RewriteOptions* options = NULL; - + //const char* host = intercept_ctx->request_headers->Lookup1(HttpAttributes::kHost); const char* host = intercept_ctx->request_ctx->gurl->HostAndPort().as_string().c_str(); if (host != NULL && strlen(host) > 0) { @@ -192,6 +170,7 @@ resource_intercept(TSCont cont, TSEvent event, void *edata) } else { int64_t numBytesToWrite, numBytesWritten; numBytesToWrite = intercept_ctx->response->size(); + TSDebug("ats-speed", "resource intercept writing out a %d bytes response", (int)numBytesToWrite); numBytesWritten = TSIOBufferWrite(intercept_ctx->resp_buffer, intercept_ctx->response->c_str(), numBytesToWrite); @@ -248,8 +227,17 @@ read_cache_header_callback(TSCont cont, TSEvent event, void *edata) if (ctx == NULL) { TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); return 0; - } - if (!ctx->resource_request) { + } else if (ctx->in_place) { + TSHttpTxnRespCacheableSet(txn, 0); + TSHttpTxnReqCacheableSet(txn, 0); + ctx->base_fetch->set_ctx(ctx); + ctx->base_fetch->set_ipro_callback((void*)resource_intercept); + ctx->driver->FetchInPlaceResource( + *ctx->gurl, false /* proxy_mode */, ctx->base_fetch); + // wait for the lookup to complete. we'll know what to do + // when the lookup completes. + return 0; + } else if (!ctx->resource_request) { TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); return 0; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_resource_intercept.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_resource_intercept.h b/plugins/experimental/ats_speed/ats_resource_intercept.h index 933f20f..1c3ef2f 100644 --- a/plugins/experimental/ats_speed/ats_resource_intercept.h +++ b/plugins/experimental/ats_speed/ats_resource_intercept.h @@ -24,6 +24,46 @@ #ifndef ATS_RESOURCE_INTERCEPT_H #define ATS_RESOURCE_INTERCEPT_H +#include <string> + +#include <ts/ts.h> + + +#include "net/instaweb/http/public/async_fetch.h" +#include "net/instaweb/http/public/headers.h" +#include "net/instaweb/util/public/string_util.h" +#include "net/instaweb/util/public/string.h" + +#include "ats_speed.h" + +using namespace net_instaweb; + +struct InterceptCtx +{ + TSVConn vconn; + TSIOBuffer req_buffer; + TSIOBufferReader req_reader; + TSIOBuffer resp_buffer; + TSIOBufferReader resp_reader; + GoogleString* response; + TransformCtx* request_ctx; + RequestHeaders* request_headers; + + InterceptCtx() + : vconn(NULL) + , req_buffer(NULL) + , req_reader(NULL) + , resp_buffer(NULL) + , resp_reader(NULL) + , response( new GoogleString() ) + , request_ctx(NULL) + , request_headers(NULL) + { + }; +}; + void setup_resource_intercept(); +static int +resource_intercept(TSCont cont, TSEvent event, void *edata); #endif // ATS_INTERCEPT_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_speed.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_speed.cc b/plugins/experimental/ats_speed/ats_speed.cc index 8fcef55..2386022 100644 --- a/plugins/experimental/ats_speed/ats_speed.cc +++ b/plugins/experimental/ats_speed/ats_speed.cc @@ -132,6 +132,12 @@ ats_ctx_alloc() ctx->alive = 0xaaaa; ctx->options = NULL; ctx->to_host = NULL; + ctx->in_place = false; + ctx->driver = NULL; + ctx->record_in_place = false; + ctx->recorder = NULL; + ctx->ipro_response_headers = NULL; + ctx->serve_in_place = false; return ctx; } @@ -183,6 +189,20 @@ ats_ctx_destroy(TransformCtx * ctx) delete ctx->to_host; ctx->to_host = NULL; } + if (ctx->driver != NULL) { + ctx->driver->Cleanup(); + ctx->driver = NULL; + } + if (ctx->recorder != NULL) { + ctx->recorder->Fail(); + ctx->recorder->DoneAndSetHeaders(NULL); // Deletes recorder. + ctx->recorder = NULL; + } + if (ctx->ipro_response_headers != NULL) { + delete ctx->ipro_response_headers; + ctx->ipro_response_headers = NULL; + } + TSfree(ctx); } @@ -439,7 +459,7 @@ ats_transform_init(TSCont contp, TransformCtx * ctx) TSError("Error TSHttpTxnClientReqGet"); return; } - + AtsServerContext* server_context = ats_process_context->server_context(); if (server_context->IsPagespeedResource(*ctx->gurl)) { CHECK(false) << "PageSpeed resource should not get here!"; @@ -448,6 +468,11 @@ ats_transform_init(TSCont contp, TransformCtx * ctx) downstream_conn = TSTransformOutputVConnGet(contp); ctx->downstream_buffer = TSIOBufferCreate(); ctx->downstream_vio = TSVConnWrite(downstream_conn, contp, TSIOBufferReaderAlloc(ctx->downstream_buffer), INT64_MAX); + if (ctx->recorder != NULL) { + TSHandleMLocRelease(reqp, TS_NULL_MLOC, req_hdr_loc); + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + return; + } // TODO(oschaaf): fix host/ip(?) SystemRequestContext* system_request_context = @@ -530,6 +555,7 @@ ats_transform_init(TSCont contp, TransformCtx * ctx) static void ats_transform_one(TransformCtx * ctx, TSIOBufferReader upstream_reader, int amount) { + TSDebug("ats-speed", "transform_one()"); TSIOBufferBlock downstream_blkp; const char *upstream_buffer; int64_t upstream_length; @@ -554,7 +580,11 @@ ats_transform_one(TransformCtx * ctx, TSIOBufferReader upstream_reader, int amou TSDebug("ats-speed", "transform!"); // TODO(oschaaf): use at least the message handler from the server conrtext here? if (ctx->inflater == NULL) { - ctx->proxy_fetch->Write(StringPiece((char*)upstream_buffer, upstream_length), ats_process_context->message_handler()); + if (ctx->recorder != NULL) { + ctx->recorder->Write(StringPiece((char*)upstream_buffer, upstream_length), ats_process_context->message_handler()); + } else { + ctx->proxy_fetch->Write(StringPiece((char*)upstream_buffer, upstream_length), ats_process_context->message_handler()); + } } else { char buf[net_instaweb::kStackBufferSize]; @@ -566,8 +596,13 @@ ats_transform_one(TransformCtx * ctx, TSIOBufferReader upstream_reader, int amou if (num_inflated_bytes < 0) { TSError("Corrupted inflation"); } else if (num_inflated_bytes > 0) { - ctx->proxy_fetch->Write(StringPiece(buf, num_inflated_bytes), - ats_process_context->message_handler()); + if (ctx->recorder != NULL ) { + ctx->recorder->Write(StringPiece(buf, num_inflated_bytes), + ats_process_context->message_handler()); + } else { + ctx->proxy_fetch->Write(StringPiece(buf, num_inflated_bytes), + ats_process_context->message_handler()); + } } } } @@ -586,8 +621,15 @@ ats_transform_finish(TransformCtx * ctx) { if (ctx->state == transform_state_output) { ctx->state = transform_state_finished; - ctx->proxy_fetch->Done(true); - ctx->proxy_fetch = NULL; + if (ctx->recorder != NULL ) { + TSDebug("ats-speed", "ipro recording finished"); + ctx->recorder->DoneAndSetHeaders(ctx->ipro_response_headers); + ctx->recorder = NULL; + } else { + TSDebug("ats-speed", "proxy fetch finished"); + ctx->proxy_fetch->Done(true); + ctx->proxy_fetch = NULL; + } } } @@ -624,6 +666,10 @@ ats_transform_do(TSCont contp) } if (upstream_todo > 0) { + if (ctx->recorder != NULL) { + ctx->downstream_length += upstream_todo; + TSIOBufferCopy(TSVIOBufferGet(ctx->downstream_vio), TSVIOReaderGet(upstream_vio), upstream_todo, 0); + } ats_transform_one(ctx, TSVIOReaderGet(upstream_vio), upstream_todo); TSVIONDoneSet(upstream_vio, TSVIONDoneGet(upstream_vio) + upstream_todo); } @@ -637,6 +683,11 @@ ats_transform_do(TSCont contp) TSContCall(TSVIOContGet(upstream_vio), TS_EVENT_VCONN_WRITE_READY, upstream_vio); } } else { + // When not recording, the base fetch will re-enable from the PSOL callback. + if (ctx->recorder != NULL) { + TSVIONBytesSet(ctx->downstream_vio, ctx->downstream_length); + TSVIOReenable(ctx->downstream_vio); + } ats_transform_finish(ctx); TSContCall(TSVIOContGet(upstream_vio), TS_EVENT_VCONN_WRITE_COMPLETE, upstream_vio); } @@ -646,6 +697,7 @@ ats_transform_do(TSCont contp) static int ats_speed_transform(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */) { + TSDebug("ats-speed", "ats_speed_transform()"); if (TSVConnClosedGet(contp)) { //ats_ctx_destroy((TransformCtx*)TSContDataGet(contp)); TSContDestroy(contp); @@ -683,12 +735,14 @@ ats_speed_transform_add(TSHttpTxn txnp) TransformCtx* ctx = get_transaction_context(txnp); CHECK(ctx); if (ctx->transform_added) { // Happens with a stale cache hit + TSDebug("ats-speed", "transform not added due to already being added"); return; } else { + TSDebug("ats-speed", "transform added"); ctx->transform_added = true; } - - TSHttpTxnUntransformedRespCache(txnp, 1); + + TSHttpTxnUntransformedRespCache(txnp, ctx->recorder == NULL ? 1 : 0); TSHttpTxnTransformedRespCache(txnp, 0); TSVConn connp; @@ -698,8 +752,6 @@ ats_speed_transform_add(TSHttpTxn txnp) TSHttpTxnHookAdd(txnp, TS_HTTP_RESPONSE_TRANSFORM_HOOK, connp); } -// Returns true if a server intercept was set up -// Which means we should not attempt any further transformation void handle_read_request_header(TSHttpTxn txnp) { TSMBuffer reqp = NULL; @@ -741,13 +793,7 @@ handle_read_request_header(TSHttpTxn txnp) { ctx->resource_request = true; TSHttpTxnArgSet(txnp, TXN_INDEX_OWNED_ARG, &TXN_INDEX_OWNED_ARG_UNSET); } - } else if (ctx->gurl->PathSansQuery() == "/pagespeed_message" - || ctx->gurl->PathSansQuery() == "/pagespeed_statistics" - || ctx->gurl->PathSansQuery() == "/pagespeed_global_statistics" - || ctx->gurl->PathSansQuery() == "/pagespeed_console" - || ctx->gurl->PathSansLeaf() == "/ats_speed_static/" - || ctx->gurl->PathSansQuery() == "/robots.txt" - ) { + } else if ( ctx->gurl->PathSansLeaf() == "/ats_speed_static/") { ctx->resource_request = true; TSHttpTxnArgSet(txnp, TXN_INDEX_OWNED_ARG, &TXN_INDEX_OWNED_ARG_UNSET); } @@ -755,7 +801,80 @@ handle_read_request_header(TSHttpTxn txnp) { ctx->beacon_request = true; TSHttpTxnArgSet(txnp, TXN_INDEX_OWNED_ARG, &TXN_INDEX_OWNED_ARG_UNSET); hook_beacon_intercept(txnp); - } + } else { + AtsServerContext* server_context = ctx->server_context; + // TODO(oschaaf): fix host/ip(?) + SystemRequestContext* system_request_context = + new SystemRequestContext(server_context->thread_system()->NewMutex(), + server_context->timer(), + "www.foo.com", + 80, + "127.0.0.1"); + + ctx->base_fetch = new AtsBaseFetch(server_context, RequestContextPtr(system_request_context), + ctx->downstream_vio, ctx->downstream_buffer, false); + + + RewriteOptions* options = NULL; + RequestHeaders* request_headers = new RequestHeaders(); + ctx->base_fetch->SetRequestHeadersTakingOwnership(request_headers); + copy_request_headers_to_psol(reqp, hdr_loc, request_headers); + + //TSHttpStatus status = TSHttpHdrStatusGet(bufp, hdr_loc); + // TODO(oschaaf): http version + //ctx->base_fetch->response_headers()->set_status_code(status); + //copy_response_headers_to_psol(bufp, hdr_loc, ctx->base_fetch->response_headers()); + //ctx->base_fetch->response_headers()->ComputeCaching(); + const char* host = ctx->gurl->HostAndPort().as_string().c_str(); + //request_headers->Lookup1(HttpAttributes::kHost); + if (host != NULL && strlen(host) > 0) { + ctx->options = get_host_options(host); + } + bool ok = ps_determine_options(server_context, + ctx->options, + request_headers, + ctx->base_fetch->response_headers(), + &options, + ctx->gurl); + + // Take ownership of custom_options. + scoped_ptr<RewriteOptions> custom_options(options); + + if (!ok) { + TSError("Failure while determining request options for psol"); + options = server_context->global_options(); + } else { + // ps_determine_options modified url, removing any ModPagespeedFoo=Bar query + // parameters. Keep url_string in sync with url. + ctx->gurl->Spec().CopyToString(ctx->url_string); + } + + if (options->in_place_rewriting_enabled() && + options->enabled() && + options->IsAllowed(ctx->gurl->Spec())) { + RewriteDriver* driver; + if (custom_options.get() == NULL) { + driver = server_context->NewRewriteDriver(ctx->base_fetch->request_context()); + } else { + driver = server_context->NewCustomRewriteDriver(custom_options.release(), ctx->base_fetch->request_context()); + } + + if (!user_agent.empty()) { + driver->SetUserAgent(ctx->user_agent->c_str()); + } + driver->SetRequestHeaders(*ctx->base_fetch->request_headers()); + ctx->driver = driver; + ctx->server_context->message_handler()->Message( + kInfo, "Trying to serve rewritten resource in-place: %s", + ctx->url_string->c_str()); + + ctx->in_place = true; + ctx->base_fetch->set_handle_error(false); + ctx->base_fetch->set_is_ipro(true); + //ctx->driver->FetchInPlaceResource( + // *ctx->gurl, false /* proxy_mode */, ctx->base_fetch); + } + } } TSfree((void*)url); } // gurl->IsWebValid() == true @@ -763,7 +882,6 @@ handle_read_request_header(TSHttpTxn txnp) { } else { DCHECK(false) << "Could not get client request header\n"; } - TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); } @@ -807,7 +925,9 @@ transform_plugin(TSCont contp, TSEvent event, void *edata) // ctx here - the interceptor does. if (ctx != NULL) { - bool is_owned = TSHttpTxnArgGet(txn, TXN_INDEX_OWNED_ARG) == &TXN_INDEX_OWNED_ARG_SET; + bool is_owned = TSHttpTxnArgGet(txn, TXN_INDEX_OWNED_ARG) == &TXN_INDEX_OWNED_ARG_SET + //TODO(oschaaf): rewrite this. + && !ctx->serve_in_place; if (is_owned) { ats_ctx_destroy(ctx); } @@ -851,6 +971,11 @@ transform_plugin(TSCont contp, TSEvent event, void *edata) TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); return 0; } + if (ctx->serve_in_place) { + TSHttpTxnArgSet(txn, TXN_INDEX_OWNED_ARG, &TXN_INDEX_OWNED_ARG_UNSET); + TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); + return 0; + } std::string* to_host = new std::string(); to_host->append(get_remapped_host(ctx->txn)); ctx->to_host = to_host; @@ -920,16 +1045,61 @@ transform_plugin(TSCont contp, TSEvent event, void *edata) return 0; } } + if (ok) { StringPiece s_content_type = get_header(response_header_buf, response_header_loc, "Content-Type"); const net_instaweb::ContentType* content_type = net_instaweb::MimeTypeToContentType(s_content_type); - if ((content_type == NULL || !content_type->IsHtmlLike())) { + if (ctx->record_in_place && content_type != NULL) { + GoogleString cache_url = *ctx->url_string; + ctx->server_context->rewrite_stats()->ipro_not_in_cache()->Add(1); + ctx->server_context->message_handler()->Message( + kInfo, + "Could not rewrite resource in-place " + "because URL is not in cache: %s", + cache_url.c_str()); + const SystemRewriteOptions* options = SystemRewriteOptions::DynamicCast( + ctx->driver->options()); + RequestHeaders request_headers; + //copy_request_headers_from_ngx(r, &request_headers); + // This URL was not found in cache (neither the input resource nor + // a ResourceNotCacheable entry) so we need to get it into cache + // (or at least a note that it cannot be cached stored there). + // We do that using an Apache output filter. + // TODO(oschaaf): fix host/ip(?) + SystemRequestContext* system_request_context = + new SystemRequestContext(ctx->server_context->thread_system()->NewMutex(), + ctx->server_context->timer(), + "www.foo.com", + 80, + "127.0.0.1"); + + ctx->recorder = new InPlaceResourceRecorder( + RequestContextPtr(system_request_context), + cache_url, + ctx->driver->CacheFragment(), + request_headers.GetProperties(), + options->respect_vary(), + options->ipro_max_response_bytes(), + options->ipro_max_concurrent_recordings(), + options->implicit_cache_ttl_ms(), + ctx->server_context->http_cache(), + ctx->server_context->statistics(), + ctx->server_context->message_handler()); + // TODO(oschaaf): does this make sense for ats? perhaps we don't need it. + ctx->ipro_response_headers = new ResponseHeaders(); + ctx->ipro_response_headers->set_status_code(status); + copy_response_headers_to_psol(response_header_buf, response_header_loc, ctx->ipro_response_headers); + ctx->ipro_response_headers->ComputeCaching(); + + ctx->recorder->ConsiderResponseHeaders( + InPlaceResourceRecorder::kPreliminaryHeaders, ctx->ipro_response_headers); + } else if ((content_type == NULL || !content_type->IsHtmlLike())) { ok = false; TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE); return 0; - } + } } if (ok) { @@ -949,8 +1119,15 @@ transform_plugin(TSCont contp, TSEvent event, void *edata) ctx->inflater = new GzipInflater(inflate_type); ctx->inflater->Init(); } - TSDebug(DEBUG_TAG, "Will optimize [%s]", ctx->url_string->c_str()); - ctx->html_rewrite = true; + ctx->html_rewrite = ctx->recorder == NULL; + if (ctx->html_rewrite) { + TSDebug(DEBUG_TAG, "Will optimize [%s]", ctx->url_string->c_str()); + } else if (ctx->recorder != NULL) { + TSDebug(DEBUG_TAG, "Will record in place: [%s]", ctx->url_string->c_str()); + } else { + CHECK(false) << "At this point, adding a transform makes no sense"; + } + set_header(response_header_buf,response_header_loc,"@gzip_nocache","0"); ats_speed_transform_add(txn); } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/006c91e9/plugins/experimental/ats_speed/ats_speed.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/ats_speed/ats_speed.h b/plugins/experimental/ats_speed/ats_speed.h index 5f46992..f23b492 100644 --- a/plugins/experimental/ats_speed/ats_speed.h +++ b/plugins/experimental/ats_speed/ats_speed.h @@ -38,7 +38,9 @@ class AtsBaseFetch; class AtsRewriteOptions; class AtsServerContext; class GzipInflater; +class InPlaceResourceRecorder; class ProxyFetch; +class RewriteDriver; class RewriteOptions; class RequestHeaders; class ResponseHeaders; @@ -63,6 +65,8 @@ typedef struct net_instaweb::AtsBaseFetch* base_fetch; net_instaweb::ProxyFetch* proxy_fetch; net_instaweb::GzipInflater* inflater; + //driver is used for IPRO flow only + net_instaweb::RewriteDriver* driver; bool write_pending; bool fetch_done; @@ -80,6 +84,11 @@ typedef struct net_instaweb::AtsRewriteOptions* options; // TODO: Use GoogleString* std::string* to_host; + bool in_place; + bool record_in_place; + net_instaweb::InPlaceResourceRecorder* recorder; + net_instaweb::ResponseHeaders* ipro_response_headers; + bool serve_in_place; } TransformCtx; TransformCtx* get_transaction_context(TSHttpTxn txnp);
