Repository: trafficserver
Updated Branches:
  refs/heads/master c316ded8b -> 644f58913


TS-3150 support for streaming HTTP fetch in atscppapi


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/cec1d99c
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/cec1d99c
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/cec1d99c

Branch: refs/heads/master
Commit: cec1d99c262debbe29693d4248f9c57a6fcc4229
Parents: 9d98c57
Author: Manjesh Nilange <[email protected]>
Authored: Mon Oct 20 14:44:36 2014 -0700
Committer: Manjesh Nilange <[email protected]>
Committed: Mon Oct 20 14:44:36 2014 -0700

----------------------------------------------------------------------
 configure.ac                                    |   1 +
 lib/atscppapi/examples/Makefile.am              |   3 +-
 .../examples/async_http_fetch/AsyncHttpFetch.cc |   3 +-
 .../AsyncHttpFetchStreaming.cc                  | 155 ++++++++++++++++
 .../async_http_fetch_streaming/Makefile.am      |  30 +++
 lib/atscppapi/src/AsyncHttpFetch.cc             | 186 +++++++++++++------
 .../src/include/atscppapi/AsyncHttpFetch.h      |  38 +++-
 7 files changed, 346 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index c5f31b7..7bf4b6f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1973,6 +1973,7 @@ AS_IF([test "x$enable_cppapi" = xyes], [
   lib/atscppapi/examples/stat_example/Makefile
   lib/atscppapi/examples/timeout_example/Makefile
   lib/atscppapi/examples/transactionhook/Makefile
+  lib/atscppapi/examples/async_http_fetch_streaming/Makefile
   lib/atscppapi/src/Makefile
 ])])
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/examples/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/atscppapi/examples/Makefile.am 
b/lib/atscppapi/examples/Makefile.am
index 08cfd48..9f56838 100644
--- a/lib/atscppapi/examples/Makefile.am
+++ b/lib/atscppapi/examples/Makefile.am
@@ -35,4 +35,5 @@ SUBDIRS = \
        timeout_example \
        internal_transaction_handling \
        async_timer \
-       intercept
+       intercept \
+        async_http_fetch_streaming

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc 
b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
index 14ff9cc..fe98748 100644
--- a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
+++ b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
@@ -149,7 +149,8 @@ private:
       const void *body;
       size_t body_size;
       async_http_fetch.getResponseBody(body, body_size);
-      TS_DEBUG(TAG, "Response body is [%.*s]", static_cast<int>(body_size), 
static_cast<const char*>(body));
+      TS_DEBUG(TAG, "Response body is %zu bytes long and is [%.*s]", 
body_size, static_cast<int>(body_size),
+               static_cast<const char*>(body));
     } else {
       TS_ERROR(TAG, "Fetch did not complete successfully; Result %d",
                static_cast<int>(async_http_fetch.getResult()));

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc
----------------------------------------------------------------------
diff --git 
a/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc 
b/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc
new file mode 100644
index 0000000..f570f6f
--- /dev/null
+++ 
b/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc
@@ -0,0 +1,155 @@
+/**
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+
+#include <atscppapi/GlobalPlugin.h>
+#include <atscppapi/InterceptPlugin.h>
+#include <atscppapi/Logger.h>
+#include <atscppapi/Async.h>
+#include <atscppapi/AsyncHttpFetch.h>
+#include <atscppapi/AsyncTimer.h>
+#include <atscppapi/PluginInit.h>
+#include <cstring>
+#include <cassert>
+#include <sstream>
+
+using namespace atscppapi;
+using std::string;
+
+// This is for the -T tag debugging
+// To view the debug messages ./traffic_server -T "async_http_fetch_example.*"
+#define TAG "async_http_fetch_example"
+
+class Intercept : public InterceptPlugin, public AsyncReceiver<AsyncHttpFetch> 
{
+public:
+  Intercept(Transaction &transaction) : InterceptPlugin(transaction, 
InterceptPlugin::SERVER_INTERCEPT),
+                                        transaction_(transaction), 
num_fetches_(0) {
+    main_url_ = transaction.getClientRequest().getUrl().getUrlString();
+  }
+  void consume(const string &data, InterceptPlugin::RequestDataType type);
+  void handleInputComplete();
+  void handleAsyncComplete(AsyncHttpFetch &async_http_fetch);
+  ~Intercept();
+private:
+  Transaction &transaction_;
+  string request_body_;
+  string main_url_;
+  string dependent_url_;
+  int num_fetches_;
+};
+
+class InterceptInstaller : public GlobalPlugin {
+public:
+  InterceptInstaller() : GlobalPlugin(true /* ignore internal transactions */) 
{
+    GlobalPlugin::registerHook(Plugin::HOOK_READ_REQUEST_HEADERS_PRE_REMAP);
+  }
+  void handleReadRequestHeadersPreRemap(Transaction &transaction) {
+    transaction.addPlugin(new Intercept(transaction));
+    TS_DEBUG(TAG, "Added intercept");
+    transaction.resume();
+  }
+};
+
+void TSPluginInit(int /* argc ATS_UNUSED */, const char * /* argv ATS_UNUSED 
*/ []) {
+  new InterceptInstaller();
+}
+
+void Intercept::consume(const string &data, InterceptPlugin::RequestDataType 
type) {
+  if (type == InterceptPlugin::REQUEST_BODY) {
+    request_body_ += data;
+  }
+}
+
+void Intercept::handleInputComplete() {
+  TS_DEBUG(TAG, "Request data complete");
+  AsyncHttpFetch *async_http_fetch = request_body_.empty() ?
+    new AsyncHttpFetch(main_url_, AsyncHttpFetch::STREAMING_ENABLED, 
transaction_.getClientRequest().getMethod()) :
+    new AsyncHttpFetch(main_url_, AsyncHttpFetch::STREAMING_ENABLED, 
request_body_);
+  Async::execute<AsyncHttpFetch>(this, async_http_fetch, getMutex());
+  ++num_fetches_;
+  size_t dependent_url_param_pos = main_url_.find("dependent_url=");
+  if (dependent_url_param_pos != string::npos) {
+    dependent_url_ = main_url_.substr(dependent_url_param_pos + 14);
+    Async::execute<AsyncHttpFetch>(this, new AsyncHttpFetch(dependent_url_,
+                                                            
AsyncHttpFetch::STREAMING_ENABLED),
+                                   getMutex());
+    ++num_fetches_;
+    TS_DEBUG(TAG, "Started fetch for dependent URL [%s]", 
dependent_url_.c_str());
+  }
+}
+
+void Intercept::handleAsyncComplete(AsyncHttpFetch &async_http_fetch) {
+  AsyncHttpFetch::Result result = async_http_fetch.getResult();
+  string url = async_http_fetch.getRequestUrl().getUrlString();
+  if (result == AsyncHttpFetch::RESULT_HEADER_COMPLETE) {
+    TS_DEBUG(TAG, "Header completed for URL [%s]", url.c_str());
+    const Response &response = async_http_fetch.getResponse();
+    std::ostringstream oss;
+    oss << HTTP_VERSION_STRINGS[response.getVersion()] << ' ' << 
response.getStatusCode() << ' '
+        << response.getReasonPhrase() << "\r\n";
+    Headers &response_headers = response.getHeaders();
+    for (Headers::iterator iter = response_headers.begin(), end = 
response_headers.end(); iter != end; ++iter) {
+      HeaderFieldName header_name = (*iter).name();
+      if (header_name != "Transfer-Encoding") {
+        oss << header_name.str() << ": " << (*iter).values() << "\r\n";
+      }
+    }
+    oss << "\r\n";
+    if (url == main_url_) {
+      Intercept::produce(oss.str());
+    }
+    else {
+      TS_DEBUG(TAG, "Response header for dependent URL\n%s", 
oss.str().c_str());
+    }
+  }
+  else if (result == AsyncHttpFetch::RESULT_PARTIAL_BODY || result == 
AsyncHttpFetch::RESULT_BODY_COMPLETE) {
+    const void *body;
+    size_t body_size;
+    async_http_fetch.getResponseBody(body, body_size);
+    if (url == main_url_) {
+      Intercept::produce(string(static_cast<const char *>(body), body_size));
+    }
+    else {
+      TS_DEBUG(TAG, "Got dependent body bit; has %zu bytes and is [%.*s]", 
body_size, static_cast<int>(body_size),
+               static_cast<const char *>(body));
+    }
+    if (result == AsyncHttpFetch::RESULT_BODY_COMPLETE) {
+      TS_DEBUG(TAG, "response body complete");
+    }
+  }
+  else {
+    TS_ERROR(TAG, "Fetch did not complete successfully; Result %d", 
static_cast<int>(result));
+    if (url == main_url_) {
+      InterceptPlugin::produce("HTTP/1.1 500 Internal Server Error\r\n\r\n");
+    }
+  }
+  if (result == AsyncHttpFetch::RESULT_TIMEOUT || result == 
AsyncHttpFetch::RESULT_FAILURE ||
+      result == AsyncHttpFetch::RESULT_BODY_COMPLETE) {
+    if (--num_fetches_ == 0) {
+      TS_DEBUG(TAG, "Marking output as complete");
+      InterceptPlugin::setOutputComplete();
+    }
+  }
+}
+
+Intercept::~Intercept() {
+  if (num_fetches_) {
+    TS_DEBUG(TAG, "Fetch still pending, but transaction closing");
+  }
+  TS_DEBUG(TAG, "Shutting down");
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am 
b/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am
new file mode 100644
index 0000000..ecc204a
--- /dev/null
+++ b/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am
@@ -0,0 +1,30 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+AM_CPPFLAGS = -I$(top_srcdir)/lib/atscppapi/src/include -Wno-unused-variable
+
+target=AsyncHttpFetchStreaming.so
+pkglibdir = ${pkglibexecdir}
+pkglib_LTLIBRARIES = AsyncHttpFetchStreaming.la
+AsyncHttpFetchStreaming_la_SOURCES = AsyncHttpFetchStreaming.cc
+AsyncHttpFetchStreaming_la_LDFLAGS = -module -avoid-version -shared 
-L$(top_srcdir)/lib/atscppapi/src/ -latscppapi
+
+all:
+       ln -sf .libs/$(target)
+
+clean-local:
+       rm -f $(target)

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/src/AsyncHttpFetch.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/AsyncHttpFetch.cc 
b/lib/atscppapi/src/AsyncHttpFetch.cc
index 042e457..af920b0 100644
--- a/lib/atscppapi/src/AsyncHttpFetch.cc
+++ b/lib/atscppapi/src/AsyncHttpFetch.cc
@@ -21,12 +21,15 @@
  */
 
 #include "atscppapi/AsyncHttpFetch.h"
+#include "atscppapi/shared_ptr.h"
 #include <ts/ts.h>
+#include <ts/experimental.h>
 #include <arpa/inet.h>
 #include "logging_internal.h"
 #include "utils_internal.h"
 
 #include <cstdio>
+#include <cstring>
 
 using namespace atscppapi;
 using std::string;
@@ -35,7 +38,7 @@ using std::string;
  * @private
  */
 struct atscppapi::AsyncHttpFetchState : noncopyable {
-  Request request_;
+  shared_ptr<Request> request_;
   Response response_;
   string request_body_;
   AsyncHttpFetch::Result result_;
@@ -44,11 +47,22 @@ struct atscppapi::AsyncHttpFetchState : noncopyable {
   TSMBuffer hdr_buf_;
   TSMLoc hdr_loc_;
   shared_ptr<AsyncDispatchControllerBase> dispatch_controller_;
+  AsyncHttpFetch::StreamingFlag streaming_flag_;
+  TSFetchSM fetch_sm_;
+  static const size_t BODY_BUFFER_SIZE = 32 * 1024;
+  char body_buffer_[BODY_BUFFER_SIZE];
+
+  AsyncHttpFetchState(const string &url_str, HttpMethod http_method, string 
request_body,
+                      AsyncHttpFetch::StreamingFlag streaming_flag)
+    : request_body_(request_body), result_(AsyncHttpFetch::RESULT_FAILURE), 
body_(NULL), body_size_(0),
+      hdr_buf_(NULL), hdr_loc_(NULL), streaming_flag_(streaming_flag), 
fetch_sm_(NULL) {
+    request_.reset(new Request(url_str, http_method, (streaming_flag_ == 
AsyncHttpFetch::STREAMING_DISABLED) ?
+                               HTTP_VERSION_1_0 : HTTP_VERSION_1_1));
+    if (streaming_flag_ == AsyncHttpFetch::STREAMING_ENABLED) {
+      body_ = body_buffer_;
+    }
+  }
 
-  AsyncHttpFetchState(const string &url_str, HttpMethod http_method, string 
request_body)
-    : request_(url_str, http_method, HTTP_VERSION_1_0), 
request_body_(request_body),
-      result_(AsyncHttpFetch::RESULT_FAILURE), body_(NULL), body_size_(0), 
hdr_buf_(NULL), hdr_loc_(NULL) { }
-  
   ~AsyncHttpFetchState() {
     if (hdr_loc_) {
       TSMLoc null_parent_loc = NULL;
@@ -57,6 +71,9 @@ struct atscppapi::AsyncHttpFetchState : noncopyable {
     if (hdr_buf_) {
       TSMBufferDestroy(hdr_buf_);
     }
+    if (fetch_sm_) {
+      TSFetchDestroy(fetch_sm_);
+    }
   }
 };
 
@@ -66,63 +83,91 @@ const unsigned int LOCAL_IP_ADDRESS = 0x0100007f;
 const int LOCAL_PORT = 8080;
 
 static int handleFetchEvents(TSCont cont, TSEvent event, void *edata) {
-  LOG_DEBUG("Fetch result returned event = %d, edata = %p", event, edata);
+  LOG_DEBUG("Received fetch event = %d, edata = %p", event, edata);
   AsyncHttpFetch *fetch_provider = static_cast<AsyncHttpFetch 
*>(TSContDataGet(cont));
   AsyncHttpFetchState *state = 
utils::internal::getAsyncHttpFetchState(*fetch_provider);
-  
-  if (event == static_cast<int>(AsyncHttpFetch::RESULT_SUCCESS)) {
-    TSHttpTxn txn = static_cast<TSHttpTxn>(edata);
-    int data_len;
-    const char *data_start = TSFetchRespGet(txn, &data_len);
-
-    if (data_start && (data_len > 0)) {
-      const char *data_end = data_start + data_len;
-      TSHttpParser parser = TSHttpParserCreate();
-
-      state->hdr_buf_ = TSMBufferCreate();
-      state->hdr_loc_ = TSHttpHdrCreate(state->hdr_buf_);
-      TSHttpHdrTypeSet(state->hdr_buf_, state->hdr_loc_, 
TS_HTTP_TYPE_RESPONSE);
-      if (TSHttpHdrParseResp(parser, state->hdr_buf_, state->hdr_loc_, 
&data_start, data_end) == TS_PARSE_DONE) {
-        TSHttpStatus status = TSHttpHdrStatusGet(state->hdr_buf_, 
state->hdr_loc_);
-        state->body_ = data_start; // data_start will now be pointing to body
-        state->body_size_ = data_end - data_start;
-        utils::internal::initResponse(state->response_, state->hdr_buf_, 
state->hdr_loc_);
-        LOG_DEBUG("Fetch result had a status code of %d with a body length of 
%ld", status, state->body_size_);
-      } else {
-        LOG_ERROR("Unable to parse response; Request URL [%s]; transaction %p",
-                  state->request_.getUrl().getUrlString().c_str(), txn);
+
+  if (state->streaming_flag_ == AsyncHttpFetch::STREAMING_DISABLED) {
+    if (event == static_cast<int>(AsyncHttpFetch::RESULT_SUCCESS)) {
+      TSHttpTxn txn = static_cast<TSHttpTxn>(edata);
+      int data_len;
+      const char *data_start = TSFetchRespGet(txn, &data_len);
+      if (data_start && (data_len > 0)) {
+        const char *data_end = data_start + data_len;
+        TSHttpParser parser = TSHttpParserCreate();
+        state->hdr_buf_ = TSMBufferCreate();
+        state->hdr_loc_ = TSHttpHdrCreate(state->hdr_buf_);
+        TSHttpHdrTypeSet(state->hdr_buf_, state->hdr_loc_, 
TS_HTTP_TYPE_RESPONSE);
+        if (TSHttpHdrParseResp(parser, state->hdr_buf_, state->hdr_loc_, 
&data_start, data_end) == TS_PARSE_DONE) {
+          TSHttpStatus status = TSHttpHdrStatusGet(state->hdr_buf_, 
state->hdr_loc_);
+          state->body_ = data_start; // data_start will now be pointing to body
+          state->body_size_ = data_end - data_start;
+          utils::internal::initResponse(state->response_, state->hdr_buf_, 
state->hdr_loc_);
+          LOG_DEBUG("Fetch result had a status code of %d with a body length 
of %ld", status, state->body_size_);
+        } else {
+          LOG_ERROR("Unable to parse response; Request URL [%s]; transaction 
%p",
+                    state->request_->getUrl().getUrlString().c_str(), txn);
+          event = static_cast<TSEvent>(AsyncHttpFetch::RESULT_FAILURE);
+        }
+        TSHttpParserDestroy(parser);
+      }
+      else {
+        LOG_ERROR("Successful fetch did not result in any content. Assuming 
failure");
         event = static_cast<TSEvent>(AsyncHttpFetch::RESULT_FAILURE);
       }
-      TSHttpParserDestroy(parser);
+      state->result_ = static_cast<AsyncHttpFetch::Result>(event);
+    }
+  }
+  else {
+    LOG_DEBUG("Handling streaming event %d", event);
+    if (event == static_cast<TSEvent>(TS_FETCH_EVENT_EXT_HEAD_DONE)) {
+      utils::internal::initResponse(state->response_, 
TSFetchRespHdrMBufGet(state->fetch_sm_),
+                                    TSFetchRespHdrMLocGet(state->fetch_sm_));
+      LOG_DEBUG("Response header initialized");
+      state->result_ = AsyncHttpFetch::RESULT_HEADER_COMPLETE;
     }
     else {
-      LOG_ERROR("Successful fetch did not result in any content. Assuming 
failure");
-      event = static_cast<TSEvent>(AsyncHttpFetch::RESULT_FAILURE);
+      state->body_size_ = TSFetchReadData(state->fetch_sm_, 
state->body_buffer_, sizeof(state->body_buffer_));
+      LOG_DEBUG("Read %zu bytes", state->body_size_);
+      state->result_ = (event == 
static_cast<TSEvent>(TS_FETCH_EVENT_EXT_BODY_READY)) ?
+        AsyncHttpFetch::RESULT_PARTIAL_BODY : 
AsyncHttpFetch::RESULT_BODY_COMPLETE;
     }
   }
-  state->result_ = static_cast<AsyncHttpFetch::Result>(event);
   if (!state->dispatch_controller_->dispatch()) {
     LOG_DEBUG("Unable to dispatch result from AsyncFetch because promise has 
died.");
   }
 
-  utils::internal::deleteAsyncHttpFetch(fetch_provider); // we must always 
cleans up when we're done.
-  TSContDestroy(cont);
+  if ((state->streaming_flag_ == AsyncHttpFetch::STREAMING_DISABLED) ||
+      (state->result_ == AsyncHttpFetch::RESULT_BODY_COMPLETE)) {
+    LOG_DEBUG("Shutting down");
+    utils::internal::deleteAsyncHttpFetch(fetch_provider); // we must always 
cleans up when we're done.
+    TSContDestroy(cont);
+  }
   return 0;
 }
 
 }
 
 AsyncHttpFetch::AsyncHttpFetch(const string &url_str, const string 
&request_body) {
-  init(url_str, HTTP_METHOD_POST, request_body);
+  init(url_str, HTTP_METHOD_POST, request_body, STREAMING_DISABLED);
 }
 
 AsyncHttpFetch::AsyncHttpFetch(const string &url_str, HttpMethod http_method) {
-  init(url_str, http_method, "");
+  init(url_str, http_method, "", STREAMING_DISABLED);
+}
+
+AsyncHttpFetch::AsyncHttpFetch(const string &url_str, StreamingFlag 
streaming_flag, const string &request_body) {
+  init(url_str, HTTP_METHOD_POST, request_body, streaming_flag);
+}
+
+AsyncHttpFetch::AsyncHttpFetch(const string &url_str, StreamingFlag 
streaming_flag, HttpMethod http_method) {
+  init(url_str, http_method, "", streaming_flag);
 }
 
-void AsyncHttpFetch::init(const string &url_str, HttpMethod http_method, const 
string &request_body) {
+void AsyncHttpFetch::init(const string &url_str, HttpMethod http_method, const 
string &request_body,
+                          StreamingFlag streaming_flag) {
   LOG_DEBUG("Created new AsyncHttpFetch object %p", this);
-  state_ = new AsyncHttpFetchState(url_str, http_method, request_body);
+  state_ = new AsyncHttpFetchState(url_str, http_method, request_body, 
streaming_flag);
 }
 
 void AsyncHttpFetch::run() {
@@ -131,23 +176,12 @@ void AsyncHttpFetch::run() {
   TSCont fetchCont = TSContCreate(handleFetchEvents, TSMutexCreate());
   TSContDataSet(fetchCont, static_cast<void *>(this)); // Providers have to 
clean themselves up when they are done.
 
-  TSFetchEvent event_ids;
-  event_ids.success_event_id = RESULT_SUCCESS;
-  event_ids.failure_event_id = RESULT_FAILURE;
-  event_ids.timeout_event_id = RESULT_TIMEOUT;
-
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_addr.s_addr = LOCAL_IP_ADDRESS;
   addr.sin_port = LOCAL_PORT;
 
-  string request_str(HTTP_METHOD_STRINGS[state_->request_.getMethod()]);
-  request_str += ' ';
-  request_str += state_->request_.getUrl().getUrlString();
-  request_str += ' ';
-  request_str += HTTP_VERSION_STRINGS[state_->request_.getVersion()];
-  request_str += "\r\n";
-  Headers &headers = state_->request_.getHeaders();
+  Headers &headers = state_->request_->getHeaders();
   if (headers.size()) {
     // remove the possibility of keep-alive
     headers.erase("Connection");
@@ -156,19 +190,53 @@ void AsyncHttpFetch::run() {
   if (!state_->request_body_.empty()) {
     char size_buf[128];
     snprintf(size_buf, sizeof(size_buf), "%zu", state_->request_body_.size());
-    state_->request_.getHeaders().set("Content-Length", size_buf);
+    headers.set("Content-Length", size_buf);
   }
-  request_str += headers.wireStr();
-  request_str += "\r\n";
-  request_str += state_->request_body_;
 
-  LOG_DEBUG("Issing TSFetchUrl with request\n[%s]", request_str.c_str());
-  TSFetchUrl(request_str.c_str(), request_str.size(), reinterpret_cast<struct 
sockaddr const *>(&addr), fetchCont,
-             AFTER_BODY, event_ids);
+  if (state_->streaming_flag_ == STREAMING_DISABLED) {
+    TSFetchEvent event_ids;
+    event_ids.success_event_id = RESULT_SUCCESS;
+    event_ids.failure_event_id = RESULT_FAILURE;
+    event_ids.timeout_event_id = RESULT_TIMEOUT;
+
+    string request_str(HTTP_METHOD_STRINGS[state_->request_->getMethod()]);
+    request_str += ' ';
+    request_str += state_->request_->getUrl().getUrlString();
+    request_str += ' ';
+    request_str += HTTP_VERSION_STRINGS[state_->request_->getVersion()];
+    request_str += "\r\n";
+    request_str += headers.wireStr();
+    request_str += "\r\n";
+    request_str += state_->request_body_;
+
+    LOG_DEBUG("Issing (non-streaming) TSFetchUrl with request\n[%s]", 
request_str.c_str());
+    TSFetchUrl(request_str.c_str(), request_str.size(), 
reinterpret_cast<struct sockaddr const *>(&addr), fetchCont,
+               AFTER_BODY, event_ids);
+  }
+  else {
+    state_->fetch_sm_ = TSFetchCreate(fetchCont, 
HTTP_METHOD_STRINGS[state_->request_->getMethod()].c_str(),
+                                      
state_->request_->getUrl().getUrlString().c_str(),
+                                      
HTTP_VERSION_STRINGS[state_->request_->getVersion()].c_str(),
+                                      reinterpret_cast<struct sockaddr const 
*>(&addr),
+                                      TS_FETCH_FLAGS_STREAM | 
TS_FETCH_FLAGS_DECHUNK);
+    string header_value;
+    for (Headers::iterator iter = headers.begin(), end = headers.end(); iter 
!= end; ++iter) {
+      HeaderFieldName header_name = (*iter).name();
+      header_value = (*iter).values();
+      TSFetchHeaderAdd(state_->fetch_sm_, header_name.c_str(), 
header_name.length(), header_value.data(),
+                       header_value.size());
+    }
+    LOG_DEBUG("Launching streaming fetch");
+    TSFetchLaunch(state_->fetch_sm_);
+    if (state_->request_body_.size()) {
+      TSFetchWriteData(state_->fetch_sm_, state_->request_body_.data(), 
state_->request_body_.size());
+      LOG_DEBUG("Wrote %zu bytes of data to fetch", 
state_->request_body_.size());
+    }
+  }
 }
 
 Headers &AsyncHttpFetch::getRequestHeaders() {
-  return state_->request_.getHeaders();
+  return state_->request_->getHeaders();
 }
 
 AsyncHttpFetch::Result AsyncHttpFetch::getResult() const {
@@ -176,7 +244,7 @@ AsyncHttpFetch::Result AsyncHttpFetch::getResult() const {
 }
 
 const Url &AsyncHttpFetch::getRequestUrl() const {
-  return state_->request_.getUrl();
+  return state_->request_->getUrl();
 }
 
 const string &AsyncHttpFetch::getRequestBody() const {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/cec1d99c/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h 
b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
index 161b957..35f5091 100644
--- a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
+++ b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
@@ -41,13 +41,25 @@ namespace utils { class internal; }
  * makes HTTP requests asynchronously. This provider automatically
  * self-destructs after the completion of the request.
  *
- * See example async_http_fetch for sample usage.
+ * See example async_http_fetch{,_streaming} for sample usage.
  */
 class AsyncHttpFetch : public AsyncProvider {
 public:
+  /** Deprecated. Use variant with streaming flag argument */
   AsyncHttpFetch(const std::string &url_str, HttpMethod http_method = 
HTTP_METHOD_GET);
 
-  AsyncHttpFetch(const std::string &url_str,  const std::string &request_body);
+  /** Deprecated. Use variant with streaming flag argument */
+  AsyncHttpFetch(const std::string &url_str, const std::string &request_body);
+
+  enum StreamingFlag {
+    STREAMING_DISABLED = 0,
+    STREAMING_ENABLED = 0x1
+  };
+
+  AsyncHttpFetch(const std::string &url_str, StreamingFlag streaming_flag,
+                 HttpMethod http_method = HTTP_METHOD_GET);
+
+  AsyncHttpFetch(const std::string &url_str, StreamingFlag streaming_flag, 
const std::string &request_body);
 
   /**
    * Used to manipulate the headers of the request to be made.
@@ -56,10 +68,14 @@ public:
    */
   Headers &getRequestHeaders();
 
-  enum Result { RESULT_SUCCESS = 10000, RESULT_TIMEOUT, RESULT_FAILURE };
+  enum Result { RESULT_SUCCESS = 10000, RESULT_TIMEOUT, RESULT_FAILURE, 
RESULT_HEADER_COMPLETE,
+                RESULT_PARTIAL_BODY, RESULT_BODY_COMPLETE };
 
   /**
-   * Used to extract the response after request completion. 
+   * Used to extract the response after request completion. Without
+   * streaming, this can result success, failure or timeout. With
+   * streaming, this can result failure, timeout, header complete,
+   * partial body or body complete.
    *
    * @return Result of the operation
    */
@@ -76,7 +92,8 @@ public:
   const std::string &getRequestBody() const;
 
   /**
-   * Used to extract the response after request completion. 
+   * Used to extract the response after request completion (after
+   * RESULT_HEADER_COMPLETE in case of streaming).
    *
    * @return Non-mutable reference to the response.
    */
@@ -86,22 +103,25 @@ public:
    * Used to extract the body of the response after request completion. On
    * unsuccessful completion, values (NULL, 0) are set.
    *
+   * When streaming is enabled, this can be called on either body result.
+   *
    * @param body Output argument; will point to the body
-   * @param body_size Output argument; will contain the size of the body 
-   * 
+   * @param body_size Output argument; will contain the size of the body
+   *
    */
   void getResponseBody(const void *&body, size_t &body_size) const;
 
   /**
    * Starts a HTTP fetch of the Request contained.
-   */  
+   */
   virtual void run();
 protected:
   virtual ~AsyncHttpFetch();
 
 private:
   AsyncHttpFetchState *state_;
-  void init(const std::string &url_str, HttpMethod http_method, const 
std::string &request_body);
+  void init(const std::string &url_str, HttpMethod http_method, const 
std::string &request_body,
+            StreamingFlag streaming_flag);
   friend class utils::internal;
 };
 

Reply via email to