[twitter-demo] use AUTO_FLUSH_BACKGROUND session Changed the twitter demo application to use AUTO_FLUSH_BACKGROUND flush mode instead of MANUAL_FLUSH mode.
Change-Id: I497c1265df132fc8ea4e635475d0d669eca21646 Reviewed-on: http://gerrit.cloudera.org:8080/4477 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fd3a05ca Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fd3a05ca Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fd3a05ca Branch: refs/heads/master Commit: fd3a05ca12fec0b803cedbf15c03fd498724ee42 Parents: 376f95b Author: Alexey Serbin <[email protected]> Authored: Tue Sep 20 08:55:04 2016 -0700 Committer: Alexey Serbin <[email protected]> Committed: Fri Sep 23 19:08:12 2016 +0000 ---------------------------------------------------------------------- src/kudu/twitter-demo/CMakeLists.txt | 89 +++++++++++++++------------ src/kudu/twitter-demo/insert_consumer.cc | 61 +++--------------- src/kudu/twitter-demo/insert_consumer.h | 26 +------- 3 files changed, 61 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/twitter-demo/CMakeLists.txt b/src/kudu/twitter-demo/CMakeLists.txt index 5c261e2..484f2d1 100644 --- a/src/kudu/twitter-demo/CMakeLists.txt +++ b/src/kudu/twitter-demo/CMakeLists.txt @@ -15,50 +15,59 @@ # specific language governing permissions and limitations # under the License. -find_library(LIBOAUTH_LIBRARY NAMES oauth) -if(NOT LIBOAUTH_LIBRARY) - message(WARNING "liboauth not found on system. Skipping twitter demo") +# Use pkgconfig to configure the build regarding liboauth. This allows +# to extract info on include and library paths, etc. The liboauth library +# is installed at alternative location on MacOS X. +find_package(PkgConfig) +if (NOT PKG_CONFIG_FOUND) + message(WARNING "pkgconfig not found. Skipping twitter demo.") else() + pkg_search_module(LIBOAUTH oauth) + if(NOT LIBOAUTH_FOUND) + message(WARNING "liboauth not found. Skipping twitter demo.") + else() + include_directories(SYSTEM ${LIBOAUTH_INCLUDE_DIRS}) + link_directories(${LIBOAUTH_LIBRARY_DIRS}) + add_library(twitter_demo + oauth.cc + parser.cc + insert_consumer.cc + twitter_streamer.cc) - add_library(twitter_demo - oauth.cc - parser.cc - insert_consumer.cc - twitter_streamer.cc) + target_link_libraries(twitter_demo + gutil + kudu_util + kudu_test_util) - target_link_libraries(twitter_demo - gutil - kudu_util - kudu_test_util) + target_link_libraries(twitter_demo + kudu_client + ${LIBOAUTH_LIBRARIES} + ${CURL_LIBRARIES} + ${KUDU_BASE_LIBS}) - target_link_libraries(twitter_demo - kudu_client - ${LIBOAUTH_LIBRARY} - ${CURL_LIBRARIES} - ${KUDU_BASE_LIBS}) + # Require that the tserver protobuf code is generated first + add_dependencies(twitter_demo + tserver_proto) - # Require that the tserver protobuf code is generated first - add_dependencies(twitter_demo - tserver_proto) - - add_executable(ingest_firehose ingest_firehose.cc) - target_link_libraries(ingest_firehose - twitter_demo) - - # Tests - ADD_KUDU_TEST(oauth-test) - # parser-test relies on symlinked data files which we can't currently copy correctly - # to the cluster. - ADD_KUDU_TEST(parser-test LABELS no_dist_test) - if(NOT "${NO_TESTS}") - target_link_libraries(oauth-test + add_executable(ingest_firehose ingest_firehose.cc) + target_link_libraries(ingest_firehose twitter_demo) - target_link_libraries(parser-test - twitter_demo) - execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt - ${EXECUTABLE_OUTPUT_PATH}) - execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt - ${EXECUTABLE_OUTPUT_PATH}) - endif() -endif() # library checks + # Tests + ADD_KUDU_TEST(oauth-test) + # parser-test relies on symlinked data files which we can't currently copy correctly + # to the cluster. + ADD_KUDU_TEST(parser-test LABELS no_dist_test) + if(NOT "${NO_TESTS}") + target_link_libraries(oauth-test + twitter_demo) + target_link_libraries(parser-test + twitter_demo) + execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-tweets.txt + ${EXECUTABLE_OUTPUT_PATH}) + execute_process(COMMAND ln -sf ${CMAKE_CURRENT_SOURCE_DIR}/example-deletes.txt + ${EXECUTABLE_OUTPUT_PATH}) + endif() + + endif() # if(NOT LIBOAUTH_LIBRARY) ... else ... +endif() # if (NOT PKG_CONFIG_FOUND) ... else ... http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/twitter-demo/insert_consumer.cc b/src/kudu/twitter-demo/insert_consumer.cc index 2371f7a..361b8bd 100644 --- a/src/kudu/twitter-demo/insert_consumer.cc +++ b/src/kudu/twitter-demo/insert_consumer.cc @@ -17,20 +17,21 @@ #include "kudu/twitter-demo/insert_consumer.h" -#include <glog/logging.h> +#include <ctime> #include <mutex> #include <string> -#include <time.h> #include <vector> +#include <glog/logging.h> + +#include "kudu/client/client.h" #include "kudu/common/wire_protocol.h" #include "kudu/common/row.h" #include "kudu/common/schema.h" -#include "kudu/client/client.h" #include "kudu/gutil/bind.h" #include "kudu/gutil/stl_util.h" -#include "kudu/tserver/tserver_service.proxy.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/tserver/tserver_service.proxy.h" #include "kudu/twitter-demo/parser.h" #include "kudu/twitter-demo/twitter-schema.h" #include "kudu/util/status.h" @@ -38,38 +39,19 @@ namespace kudu { namespace twitter_demo { -using tserver::TabletServerServiceProxy; -using tserver::WriteRequestPB; -using tserver::WriteResponsePB; -using rpc::RpcController; using kudu::client::KuduInsert; using kudu::client::KuduClient; using kudu::client::KuduSession; -using kudu::client::KuduStatusCallback; -using kudu::client::KuduTable; using kudu::client::KuduTableCreator; -FlushCB::FlushCB(InsertConsumer* consumer) - : consumer_(consumer) { -} - -FlushCB::~FlushCB() { -} - -void FlushCB::Run(const Status& status) { - consumer_->BatchFinished(status); -} - -InsertConsumer::InsertConsumer(const client::sp::shared_ptr<KuduClient> &client) +InsertConsumer::InsertConsumer(client::sp::shared_ptr<KuduClient> client) : initted_(false), schema_(CreateTwitterSchema()), - flush_cb_(this), - client_(client), - request_pending_(false) { + client_(std::move(client)) { } Status InsertConsumer::Init() { - const char *kTableName = "twitter"; + static const string kTableName = "twitter"; Status s = client_->OpenTable(kTableName, &table_); if (s.IsNotFound()) { gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); @@ -83,21 +65,13 @@ Status InsertConsumer::Init() { session_ = client_->NewSession(); session_->SetTimeoutMillis(1000); - CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH)); + RETURN_NOT_OK(session_->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); initted_ = true; return Status::OK(); } InsertConsumer::~InsertConsumer() { - // TODO: to be safe, we probably need to cancel any current RPC, - // or else the callback will get called on the destroyed object. - // Given this is just demo code, cutting this corner. - CHECK(!request_pending_); -} - -void InsertConsumer::BatchFinished(const Status& s) { - std::lock_guard<simple_spinlock> l(lock_); - request_pending_ = false; + Status s(session_->Flush()); if (!s.ok()) { bool overflow; vector<client::KuduError*> errors; @@ -140,21 +114,6 @@ void InsertConsumer::ConsumeJSON(const Slice& json_slice) { CHECK_OK(r->SetInt32("user_friends_count", event_.tweet_event.user_friends_count)); CHECK_OK(r->SetStringCopy("user_image_url", event_.tweet_event.user_image_url)); CHECK_OK(session_->Apply(ins.release())); - - // TODO: once the auto-flush mode is implemented, switch to using that - // instead of the manual batching here - bool do_flush = false; - { - std::lock_guard<simple_spinlock> l(lock_); - if (!request_pending_) { - request_pending_ = true; - do_flush = true; - } - } - if (do_flush) { - VLOG(1) << "Sending batch of " << session_->CountBufferedOperations(); - session_->FlushAsync(&flush_cb_); - } } } // namespace twitter_demo http://git-wip-us.apache.org/repos/asf/kudu/blob/fd3a05ca/src/kudu/twitter-demo/insert_consumer.h ---------------------------------------------------------------------- diff --git a/src/kudu/twitter-demo/insert_consumer.h b/src/kudu/twitter-demo/insert_consumer.h index 826ca11..00cdce4 100644 --- a/src/kudu/twitter-demo/insert_consumer.h +++ b/src/kudu/twitter-demo/insert_consumer.h @@ -40,40 +40,21 @@ class KuduStatusCallback; namespace twitter_demo { -class InsertConsumer; - -class FlushCB : public client::KuduStatusCallback { - public: - explicit FlushCB(InsertConsumer* consumer); - - virtual ~FlushCB(); - - virtual void Run(const Status& status) OVERRIDE; - private: - InsertConsumer* consumer_; -}; - // Consumer of tweet data which parses the JSON and inserts // into a remote tablet via RPC. class InsertConsumer : public TwitterConsumer { public: - explicit InsertConsumer( - const client::sp::shared_ptr<client::KuduClient> &client); + explicit InsertConsumer(client::sp::shared_ptr<client::KuduClient> client); ~InsertConsumer(); Status Init(); - virtual void ConsumeJSON(const Slice& json) OVERRIDE; + virtual void ConsumeJSON(const Slice& json_slice) OVERRIDE; private: - friend class FlushCB; - - void BatchFinished(const Status& s); - bool initted_; client::KuduSchema schema_; - FlushCB flush_cb_; TwitterEventParser parser_; // Reusable object for latest event. @@ -82,9 +63,6 @@ class InsertConsumer : public TwitterConsumer { client::sp::shared_ptr<client::KuduClient> client_; client::sp::shared_ptr<client::KuduSession> session_; client::sp::shared_ptr<client::KuduTable> table_; - - simple_spinlock lock_; - bool request_pending_; }; } // namespace twitter_demo
