jamesge commented on a change in pull request #537: customise span exporter
URL: https://github.com/apache/incubator-brpc/pull/537#discussion_r362696392
 
 

 ##########
 File path: src/brpc/span.cpp
 ##########
 @@ -422,378 +314,12 @@ void Span::Submit(Span* span, int64_t cpuwide_time_us) {
     }
 }
 
-static void Span2Proto(const Span* span, RpczSpan* out) {
-    out->set_trace_id(span->trace_id());
-    out->set_span_id(span->span_id());
-    out->set_parent_span_id(span->parent_span_id());
-    out->set_log_id(span->log_id());
-    out->set_base_cid(span->base_cid().value);
-    out->set_ending_cid(span->ending_cid().value);
-    out->set_remote_ip(butil::ip2int(span->remote_side().ip));
-    out->set_remote_port(span->remote_side().port);
-    out->set_type(span->type());
-    out->set_async(span->async());
-    out->set_protocol(span->protocol());
-    out->set_request_size(span->request_size());
-    out->set_response_size(span->response_size());
-    out->set_received_real_us(span->received_real_us());
-    out->set_start_parse_real_us(span->start_parse_real_us());
-    out->set_start_callback_real_us(span->start_callback_real_us());
-    out->set_start_send_real_us(span->start_send_real_us());
-    out->set_sent_real_us(span->sent_real_us());
-    out->set_full_method_name(span->full_method_name());
-    out->set_info(span->info());
-    out->set_error_code(span->error_code());
-}
-
-inline void ToBigEndian(uint64_t n, uint32_t* buf) {
-    buf[0] = htonl(n >> 32);
-    buf[1] = htonl(n & 0xFFFFFFFFUL);
-}
-
-inline uint64_t ToLittleEndian(const uint32_t* buf) {
-    return (((uint64_t)ntohl(buf[0])) << 32) | ntohl(buf[1]);
-}
-
-SpanDB* SpanDB::Open() {
-    SpanDB local;
-    leveldb::Status st;
-    char prefix[64];
-    time_t rawtime;
-    time(&rawtime);
-    struct tm lt_buf;
-    struct tm* timeinfo = localtime_r(&rawtime, &lt_buf);
-    const size_t nw = strftime(prefix, sizeof(prefix),
-                               "/%Y%m%d.%H%M%S", timeinfo);
-    const int nw2 = snprintf(prefix + nw, sizeof(prefix) - nw, ".%d",
-                             getpid());
-    leveldb::Options options;
-    options.create_if_missing = true;
-    options.error_if_exists = true;
-
-    local.id_db_name.append(FLAGS_rpcz_database_dir);
-    local.id_db_name.append(prefix, nw + nw2);
-    // Create the dir first otherwise leveldb fails.
-    butil::File::Error error;
-    const butil::FilePath dir(local.id_db_name);
-    if (!butil::CreateDirectoryAndGetError(dir, &error)) {
-        LOG(ERROR) << "Fail to create directory=`" << dir.value() << ", "
-                   << error;
-        return NULL;
-    }
-
-    local.id_db_name.append("/id.db");
-    st = leveldb::DB::Open(options, local.id_db_name.c_str(), &local.id_db);
-    if (!st.ok()) {
-        LOG(ERROR) << "Fail to open id_db: " << st.ToString();
-        return NULL;
-    }
-
-    local.time_db_name.append(FLAGS_rpcz_database_dir);
-    local.time_db_name.append(prefix, nw + nw2);
-    local.time_db_name.append("/time.db");
-    st = leveldb::DB::Open(options, local.time_db_name.c_str(), 
&local.time_db);
-    if (!st.ok()) {
-        LOG(ERROR) << "Fail to open time_db: " << st.ToString();
-        return NULL;
-    }
-    SpanDB* db = new (std::nothrow) SpanDB;
-    if (NULL == db) {
-        return NULL;
-    }
-    LOG(INFO) << "Opened " << local.id_db_name << " and "
-               << local.time_db_name;
-    Swap(local, *db);
-    return db;
-}
-
-leveldb::Status SpanDB::Index(const Span* span, std::string* value_buf) {
-    leveldb::WriteOptions options;
-    options.sync = false;
-
-    leveldb::Status st;
-    
-    // NOTE: Writing into time_db before id_db so that if the second write
-    // fails, the entry in time_db will be finally removed when it's out
-    // of time window.
-
-    const int64_t start_time = span->GetStartRealTimeUs();
-    BriefSpan brief;
-    brief.set_trace_id(span->trace_id());
-    brief.set_span_id(span->span_id());
-    brief.set_log_id(span->log_id());
-    brief.set_type(span->type());
-    brief.set_error_code(span->error_code());
-    brief.set_request_size(span->request_size());
-    brief.set_response_size(span->response_size());
-    brief.set_start_real_us(start_time);
-    brief.set_latency_us(span->GetEndRealTimeUs() - start_time);
-    brief.set_full_method_name(span->full_method_name());
-    if (!brief.SerializeToString(value_buf)) {
-        return leveldb::Status::InvalidArgument(
-            leveldb::Slice("Fail to serialize BriefSpan"));
-    }
-    // We need to make the time monotonic otherwise if older entries are
-    // overwritten by newer ones, entries in id_db associated with the older
-    // entries are not evicted. Surely we can call DB::Get() before Put(), but
-    // that would be too slow due to the storage model of leveldb. One feasible
-    // method is to maintain recent window of keys to time_db, when there's a
-    // conflict before Put(), try key+1us until an unused time is found. The
-    // window could be 5~10s. However this method needs a std::map(slow) or
-    // hashmap+queue(more memory: remember that we're just a framework), and
-    // this method can't guarantee no duplication when real time goes back
-    // significantly.
-    // Since the time to this method is ALMOST in ascending order, we use a
-    // very simple strategy: if the time is not greater than last-time, set
-    // it to be last-time + 1us. This works when time goes back because the
-    // real time is at least 1000000 / FLAGS_rpcz_max_span_per_second times 
faster
-    // and it will finally catch up with our time key. (provided the flag
-    // is less than 1000000).
-    int64_t time_key = start_time;
-    if (time_key <= g_last_time_key) {
-        time_key = g_last_time_key + 1;
-    }
-    g_last_time_key = time_key;
-    uint32_t time_data[2];
-    ToBigEndian(time_key, time_data);
-    st = time_db->Put(options,
-                      leveldb::Slice((char*)time_data, sizeof(time_data)),
-                      leveldb::Slice(value_buf->data(), value_buf->size()));
-    if (!st.ok()) {
-        return st;
-    }
-    
-    uint32_t key_data[4];
-    ToBigEndian(span->trace_id(), key_data);
-    ToBigEndian(span->span_id(), key_data + 2);
-    leveldb::Slice key((char*)key_data, sizeof(key_data));
-    RpczSpan value_proto;
-    Span2Proto(span, &value_proto);
-    // client spans should be reversed.
-    size_t client_span_count = span->CountClientSpans();
-    for (size_t i = 0; i < client_span_count; ++i) {
-        value_proto.add_client_spans();
-    }
-    size_t i = 0;
-    for (const Span* p = span->_next_client; p; p = p->_next_client, ++i) {
-        Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i - 
1));
-    }
-    if (!value_proto.SerializeToString(value_buf)) {
-        return leveldb::Status::InvalidArgument(
-            leveldb::Slice("Fail to serialize RpczSpan"));
-    }
-    leveldb::Slice value(value_buf->data(), value_buf->size());
-    st = id_db->Put(options, key, value);
-    return st;
-}
-
-// NOTE: may take more than 100ms
-leveldb::Status SpanDB::RemoveSpansBefore(int64_t tm) {
-    if (id_db == NULL || time_db == NULL) {
-        return leveldb::Status::InvalidArgument(leveldb::Slice("NULL param"));
-    }
-    leveldb::Status rc;
-    leveldb::WriteOptions options;
-    options.sync = false;
-    leveldb::Iterator* it = time_db->NewIterator(leveldb::ReadOptions());
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-        if (it->key().size() != 8) {
-            LOG(ERROR) << "Invalid key size: " << it->key().size();
-            continue;
-        }
-        const int64_t realtime = 
-            ToLittleEndian((const uint32_t*)it->key().data());
-        if (realtime >= tm) {  // removal is done.
-            break;
-        }
-        BriefSpan brief;
-        if (brief.ParseFromArray(it->value().data(), it->value().size())) {
-            uint32_t key_data[4];
-            ToBigEndian(brief.trace_id(), key_data);
-            ToBigEndian(brief.span_id(), key_data + 2);
-            leveldb::Slice key((char*)key_data, sizeof(key_data));
-            rc = id_db->Delete(options, key);
-            if (!rc.ok()) {
-                LOG(ERROR) << "Fail to delete from id_db";
-                break;
-            }
-        } else {
-            LOG(ERROR) << "Fail to parse from value";
-        }
-        rc = time_db->Delete(options, it->key());
-        if (!rc.ok()) {
-            LOG(ERROR) << "Fail to delete from time_db";
-            break;
-        }
-    }
-    delete it;
-    return rc;
-}
-
-// Write span into leveldb.
 void Span::dump_and_destroy(size_t /*round*/) {
-    StartIndexingIfNeeded();
-    
-    std::string value_buf;
-
-    butil::intrusive_ptr<SpanDB> db;
-    if (GetSpanDB(&db) != 0) {
-        if (g_span_ending) {
-            destroy();
-            return;
-        }
-        SpanDB* db2 = SpanDB::Open();
-        if (db2 == NULL) {
-            LOG(WARNING) << "Fail to open SpanDB";
-            destroy();
-            return;
-        }
-        ResetSpanDB(db2);
-        db.reset(db2);
-    }
-
-    leveldb::Status st = db->Index(this, &value_buf);
+    TracingSpan tracing_span;
+    Copy2TracingSpan(&tracing_span);
 
 Review comment:
   一开始把Span和RpczSpan(对应TracingSpan)独立开来的原因:1.内部用原生struct,可以做一些深度优化,减少开销 
2.用户拿到的是pb,方便用户传输和存储。但现在看起来,1是nice to have,span中主要是primitive 
types,原生struct相比pb差异不大;而2是must(否则所有用户都要写一个类似CopyToTracingSpan的函数,太啰嗦)。我觉得可以考虑让Span包含TracingSpan,替换掉Span内部的众多字段,这样也不用转化了。

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to