Sergey,

>If I understand correctly when there is a continuous flow of updates to the
page already transferred to receiver checkpointer will write this page to
the log file over and over again. Do you see here any risks of exhausting
disk space on sender's side?

We could track the set of page which are in log file to avoid this issue
(any concurrent hash set would work fine);

> What if some updates come after checkpointer stopped updating log file?
How
these updates will be transferred to the receiver and applied there?

Temporary wal file on the receiver from the original approach should cover
this case.

On Tue, Nov 27, 2018 at 7:19 PM Sergey Chugunov <sergey.chugu...@gmail.com>
wrote:

> Eduard,
>
> This algorithm looks much easier but could you clarify some edge cased
> please?
>
> If I understand correctly when there is a continuous flow of updates to the
> page already transferred to receiver checkpointer will write this page to
> the log file over and over again. Do you see here any risks of exhausting
> disk space on sender's side?
>
> What if some updates come after checkpointer stopped updating log file? How
> these updates will be transferred to the receiver and applied there?
>
> On Tue, Nov 27, 2018 at 7:52 PM Eduard Shangareev <
> eduard.shangar...@gmail.com> wrote:
>
> > So, after some discussion, I could describe another approach on how to
> > build consistent partition on the fly.
> >
> > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > 2. After checkpoint finish, we start sending partition file (without any
> > lock) to the receiver from 0 to fixed size.
> > 3. Next checkpoints if they detect that they would override some pages of
> > transferring file should write the previous state of a page to a
> dedicated
> > file.
> > So, we would have a list of pages written 1 by 1, page id is written in
> the
> > page itself so we could determine page index. Let's name it log.
> > 4. When transfer finished checkpointer would stop updating log-file. Now
> we
> > are ready to send it to the receiver.
> > 5. On receiver side we start merging the dirty partition file with log
> > (updating it with pages from log-file).
> >
> > So, an advantage of this method:
> > - checkpoint-thread work couldn't  increase more than twice;
> > - checkpoint-threads shouldn't wait for anything;
> > - in best case, we receive partition without any extra effort.
> >
> >
> > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > eduard.shangar...@gmail.com> wrote:
> >
> > > Maxim,
> > >
> > > I have looked through your algorithm of reading partition consistently.
> > > And I have some questions/comments.
> > >
> > > 1. The algorithm requires heavy synchronization between
> checkpoint-thread
> > > and new-approach-rebalance-threads,
> > > because you need strong guarantees to not start writing or reading to
> > > chunk which was updated or started reading by the counterpart.
> > >
> > > 2. Also, if we have started transferring this chunk in original
> partition
> > > couldn't be updated by checkpoint-threads. They should wait for
> transfer
> > > finishing.
> > >
> > > 3. If sending is slow and partition is updated then in worst case
> > > checkpoint-threads would create the whole copy of the partition.
> > >
> > > So, what we have:
> > > -on every page write checkpoint-thread should synchronize with
> > > new-approach-rebalance-threads;
> > > -checkpoint-thread should do extra-work, sometimes this could be as
> huge
> > > as copying the whole partition.
> > >
> > >
> > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > ilya.kasnach...@gmail.com>
> > > wrote:
> > >
> > >> Hello!
> > >>
> > >> This proposal will also happily break my compression-with-dictionary
> > patch
> > >> since it relies currently on only having local dictionaries.
> > >>
> > >> However, when you have compressed data, maybe speed boost is even
> > greater
> > >> with your approach.
> > >>
> > >> Regards,
> > >> --
> > >> Ilya Kasnacheev
> > >>
> > >>
> > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmu...@gmail.com>:
> > >>
> > >> > Igniters,
> > >> >
> > >> >
> > >> > I'd like to take the next step of increasing the Apache Ignite with
> > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > >> > procedure doesn't utilize the network and storage device throughout
> to
> > >> > its full extent even with enough meaningful values of
> > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > >> > idea [3] of transferring cache partition files over the network.
> > >> > From my point, the case to which this type of rebalancing procedure
> > >> > can bring the most benefit – is adding a completely new node or set
> of
> > >> > new nodes to the cluster. Such a scenario implies fully relocation
> of
> > >> > cache partition files to the new node. To roughly estimate the
> > >> > superiority of partition file transmitting over the network the
> native
> > >> > Linux scp\rsync commands can be used. My test environment showed the
> > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > >> > single-threaded rebalance speed.
> > >> >
> > >> >
> > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > >> > process details of a new rebalance approach on that page. Below you
> > >> > can find the most significant details of the new rebalance procedure
> > >> > and components of the Apache Ignite which are proposed to change.
> > >> >
> > >> > Any feedback is very appreciated.
> > >> >
> > >> >
> > >> > *PROCESS OVERVIEW*
> > >> >
> > >> > The whole process is described in terms of rebalancing single cache
> > >> > group and partition files would be rebalanced one-by-one:
> > >> >
> > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > >> > supplier node;
> > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > >> > starts the new checkpoint process;
> > >> > 3. The supplier node creates empty the temporary cache partition
> file
> > >> > with .tmp postfix in the same cache persistence directory;
> > >> > 4. The supplier node splits the whole cache partition file into
> > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > >> > cache partition file chunk and tries to flush dirty page to the
> cache
> > >> > partition file
> > >> > 4.1.1. If rebalance chunk already transferred
> > >> > 4.1.1.1. Flush the dirty page to the file;
> > >> > 4.1.2. If rebalance chunk not transferred
> > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > >> > 4.1.2.2. Flush the dirty page to the file;
> > >> > 4.2. The node starts sending to the demander node each cache
> partition
> > >> > file chunk one by one using FileChannel#transferTo
> > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > >> > it from the temporary cache partition file;
> > >> > 4.2.2. If the current chunk is not touched – read it from the
> original
> > >> > cache partition file;
> > >> > 5. The demander node starts to listen to new pipe incoming
> connections
> > >> > from the supplier node on TcpCommunicationSpi;
> > >> > 6. The demander node creates the temporary cache partition file with
> > >> > .tmp postfix in the same cache persistence directory;
> > >> > 7. The demander node receives each cache partition file chunk one by
> > one
> > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> chunk;
> > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > >> > partition file position;
> > >> > 8. When the demander node receives the whole cache partition file
> > >> > 8.1. The node initializes received .tmp file as its appropriate
> cache
> > >> > partition file;
> > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > >> > beginning of WAL-temporary storage;
> > >> > 8.3. All async operations corresponding to this partition file still
> > >> > write to the end of temporary WAL;
> > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > >> > 8.4.1. Start the first checkpoint;
> > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> partition;
> > >> > 8.4.3. All operations now are switched to the partition file instead
> > >> > of writing to the temporary WAL;
> > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > >> > 9. The supplier node deletes the temporary cache partition file;
> > >> >
> > >> >
> > >> > *COMPONENTS TO CHANGE*
> > >> >
> > >> > CommunicationSpi
> > >> >
> > >> > To benefit from zero copy we must delegate the file transferring to
> > >> > FileChannel#transferTo(long, long,
> > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > >> > transferTo method is only executed if the destination buffer
> inherits
> > >> > from an internal JDK class.
> > >> >
> > >> > Preloader
> > >> >
> > >> > A new implementation of cache entries preloader assume to be done.
> The
> > >> > new implementation must send and receive cache partition files over
> > >> > the CommunicationSpi channels by chunks of data with validation
> > >> > received items. The new layer over the cache partition file must
> > >> > support direct using of FileChannel#transferTo method over the
> > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > >> > cache partition file transfer must have the ability to be limited at
> > >> > runtime.
> > >> >
> > >> > Checkpointer
> > >> >
> > >> > When the supplier node receives the cache partition file demand
> > >> > request it will send the file over the CommunicationSpi. The cache
> > >> > partition file can be concurrently updated by checkpoint thread
> during
> > >> > its transmission. To guarantee the file consistency Сheckpointer
> must
> > >> > use copy-on-write technique and save a copy of updated chunk into
> the
> > >> > temporary file.
> > >> >
> > >> > (new) Catch-up temporary WAL
> > >> >
> > >> > While the demander node is in the partition file transmission state
> it
> > >> > must save all cache entries corresponding to the moving partition
> into
> > >> > a new temporary WAL storage. These entries will be applied later one
> > >> > by one on the received cache partition file. All asynchronous
> > >> > operations will be enrolled to the end of temporary WAL storage
> during
> > >> > storage reads until it becomes fully read. The file-based FIFO
> > >> > approach assumes to be used by this process.
> > >> >
> > >> >
> > >> > *RECOVERY*
> > >> >
> > >> > In case of crash recovery, there is no additional actions need to be
> > >> > applied to keep the cache partition file consistency. We are not
> > >> > recovering partition with the moving state, thus the single
> partition
> > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > >> > the single-file-transmission process. The cache partition file will
> be
> > >> > fully loaded on the next rebalance procedure.
> > >> >
> > >> > To provide default cluster recovery guarantee we must to:
> > >> > 1. Start the checkpoint process when the temporary WAL storage
> becomes
> > >> > empty;
> > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > >> > cache partition;
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >>
> >
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > >> > [2]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > >> > [4]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > >> >
> > >>
> > >
> >
>

Reply via email to