Hi in my use case I would like to read messages from a stream, selectively
store certain structs from some messages in a dictionary for later use.
Unfortunately I cannot get it to work without leaking memory. The total
number of messages in the dictionary is bounded, as new messages with the
same key would replace the old message. (Actually valgrind doesn't think it
is leaking, but I can see my memory usage is increasing indefinitely).
Test code
messages are stored in a vector `data`
struct ProcessStream {
using CapnpMsg = capnp_utils::CapnpMessage<Bar>;
std::vector<std::shared_ptr<CapnpMsg>> data;
bool process_message(
kj::BufferedInputStreamWrapper& buffered_stream, kj::Array<capnp::
word>& scratch
) {
capnp::InputStreamMessageReader reader(buffered_stream, capnp::
ReaderOptions(), scratch.asPtr());
const auto ts_msg = reader.getRoot<Foo>();
auto mir_msg = ts_msg.getBar();
auto capnp_msg = std::make_shared<CapnpMsg>(mir_msg);
data.emplace_back(capnp_msg);
if (data.size() > 1000000) {
return false;
}
return true;
}
void start() {
// 1 MB of scratch.
kj::Array<capnp::word> scratch = kj::heapArray<capnp::word>(1024 *
1024 / sizeof(capnp::word));
kj::FdInputStream fd_stream(fileno(stdin));
kj::BufferedInputStreamWrapper buffered_stream(fd_stream);
while (buffered_stream.tryGetReadBuffer() != nullptr) {
if (!process_message(buffered_stream, scratch)) {
break;
}
}
}
};
void foo() {
ProcessStream stream;
stream.start();
std::cout << "A" << std::endl;
usleep(1000 * 1000 * 5);
stream.data.clear();
}
int main(int argc, char* argv[]) {
foo();
std::cout << "B" << std::endl;
usleep(1000 * 1000 * 10);
}
The following are the 3 ways to use a wrapper class to store the messages
namespace capnp_utils {
/// 1 & 2 takes 10x more memory.
template<typename R>
class CapnpMessage2 {
// TODO: Should be private, but leave it public for debugging.
public:
typename R::Reader msg_reader;
std::unique_ptr<capnp::MallocMessageBuilder> msg_builder;
public:
CapnpMessage2(const typename R::Reader& reader) {
msg_builder = std::make_unique<capnp::MallocMessageBuilder
>();
msg_builder->setRoot(reader);
msg_reader = msg_builder->getRoot<R>().asReader();
}
const typename R::Reader& get() const {
return msg_reader;
}
};
template<typename R>
class CapnpMessage1 {
public:
typename R::Reader struct_reader;
std::unique_ptr<capnp::MallocMessageBuilder> msg_builder;
public:
CapnpMessage1(const typename R::Reader& reader) {
capnp::MallocMessageBuilder tmp;
tmp.setRoot(reader);
const auto raw = capnp::messageToFlatArray(tmp);
msg_builder = std::make_unique<capnp::MallocMessageBuilder
>();
capnp::initMessageBuilderFromFlatArrayCopy(raw, *msg_builder
);
struct_reader = msg_builder->getRoot<R>().asReader();
}
const typename R::Reader& get() const {
return struct_reader;
}
};
template<typename R>
class CapnpMessage {
public:
typename R::Reader struct_reader;
// `struct_reader` is only valid when `msg_reader` is alive.
std::unique_ptr<capnp::FlatArrayMessageReader> msg_reader;
kj::Array<capnp::word> raw;
public:
CapnpMessage(const typename R::Reader& reader) {
capnp::MallocMessageBuilder msg_builder;
msg_builder.setRoot(reader);
raw = capnp::messageToFlatArray(msg_builder);
msg_reader = std::make_unique<capnp::FlatArrayMessageReader
>(raw);
struct_reader = msg_reader->getRoot<R>();
}
const typename R::Reader& get() const {
return struct_reader;
}
};
}
`CapnpMessage2` copies the message once, while `CapnpMessage1` and
`CapnpMessage` both copy the message twice.
In terms of memory usage:
A B
CapnpMessage2 8152MB 8136MB
CapnpMessage1 8166MB 8151MB
CapnpMessage 640MB 625MB
It looks that `CapnpMessage1` & `CapnpMessage2` use far more memory than
`CapnpMessage`. Is it because the builder has reserved too much space even
though I only need it to be as big as the reader?
The important part is that in all 3 cases, memory are still used at time B
although `data` is cleared. (Even if I reset the `unique_ptr` inside the
wrapper class).
Is it somehow related to memory fragmentation?
What is the best approach in this case? I would very much appreciate any
help to reduce the steady memory increase.
--
You received this message because you are subscribed to the Google Groups
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
Visit this group at https://groups.google.com/group/capnproto.