Hello everyone,
As an interview task I've got to make CommitLog multithreaded. I'm new
to Cassandra project and therefore, before I start modifying code, I
have to make sure I understand what is going on there correctly.
Feel free to correct anything I got wrong or partially wrong.
1. The CommitLog singleton object is responsible for receiving
RowMutation objects by its add method. The add method is thread-safe and
is aimed to be called by many threads adding their RowMutations
independently.
2. Each invocation of CommitLog#add puts a new task onto the queue.
This task is represented by LogRecordAdder callable object, which is
responsible for actually calling the CommitLogSegment#write method for
doing all the "hard work" of serializing the RowMutation object,
calculating CRC and writing that to the memory mapped CommitLogSegment
file buffer. The add method immediately returns a Future object, which
can be waited for (if needed) - it will block until the row mutation is
saved to the log file and (optionally) synced.
3. The queued tasks are processed one-by-one, sequentially by the
appropriate ICommitLogExecutorService. This service also controls
syncing the active memory mapped segments. There are two sync strategies
available: periodic and batched. The periodic simply calls sync
periodically by asynchronously putting appropriate sync task into the
queue, inbetween the LogRecordAdder tasks. The LogRecordAdder tasks are
"done" as soon as they are written to the log, so the caller *won't
wait* for the sync. On the other hand, the batched strategy
(BatchCommitLogExecutorService), performs the tasks in batches, each
batch finished with an sync operation. The tasks are marked as done
*after* the sync operation is finished. This deferred task marking is
achieved thanks to CheaterFutureTask class - allowing to run the task
without immediately marking FutureTask as done. Nice. :)
4. The serialized size of the RowMutation object is calculated twice:
once before submitting to the ExecutorService - to detect if it is not
larger than the segment size, and then after being taken from the queue
for execution - to check if it fits into the active CommitLogSegment,
and if it doesn't, to activate a new CommitLogSegment. Looks to me like
a point needing optimisation. I couldn't find any code for caching the
serialized size to avoid doing it twice.
5. The serialization, CRC calculation and actual commit log writes are
happening sequentially. The aim of this ticket is to make it parallel.
Questions:
1. What happens to the recovery, if the power goes off before the log
has been synced, and it has been written partially (e.g. it is truncated
in the middle of the RowMutation data)? Are incomplete RowMutation
writes detected only by means of CRC (CommitLog around lines 237-240),
or is there some other mechanism for it?
2. Is the CommitLog#add method allowed to do some heavier computations?
What is the contract for it? Does it have to return immediately or can I
move some code into it?
Solutions I consider (please comment):
1. Moving the serialized size calculation, serialization and CRC
calculation totally before the executor service queue, so that these
operations would be parallel, and performed once per RowMutation object.
The calculated size / data array / CRC value would be appended to the
task and put into the queue. Then copying that into the commit log would
proceed sequentially - the task would contain only code for log writing.
This is the safest and easiest solution, but also the least performant,
because copying is still sequential and still might be a bottleneck. The
logic of allocating new commit log segments and syncing remains unchanged.
2. Moving the serialized size calculation, serialization, CRC
calculation *and commit log writing* before the executor service queue.
This raises immediately some problems / questions:
a) The code for segment allocation needs to be changed, as it becomes
multithreaded. It can be done using AtomicInteger.compareAndSet, so that
each RowMutation gets its own, non-overlapping piece of commit log to
write into.
b) What happens if there is not enough free space in the current active
segment? Do we allow more active segments at once? Or do we restrict the
parallelism to writing just into a single active segment (I don't like
it, as it would be for certain less performant, because we would have to
wait for finishing the current active segement, before we can start a
new one)?
c) Is the recovery method ready for reading partially written (invalid)
RowMutation, that is not the last mutation in the commit log? If we
allow writing several row mutations parallel, it has to be.
d) The tasks are sent to the queue only for wait-for-sync functionality
- they would not contain any code to execute, because everything would
be already done.
3. Everything just as 2., but with an addition, that the serialization
code writes directly into the target memory mapped buffer and not into a
temporary byte array. This would save us copying and also put less
strain on GC.
Sorry, for such a long e-mail and best regards,
Piotr Kolaczkowski
--
Piotr Kołaczkowski
Instytut Informatyki, Politechnika Warszawska
Nowowiejska 15/19, 00-665 Warszawa
e-mail: pkola...@ii.pw.edu.pl
www: http://home.elka.pw.edu.pl/~pkolaczk/