Hi Tushar,
I thought the plan was to change WindowDataManager to use WAL once WAL is
added to Malhar. I don't know when did this plan change.

There is already a ticket created for it and I was going to work on it once
WAL is moved to Malhar.

https://datatorrent.atlassian.net/browse/SPOI-7116

Also a related discussion happened here:

http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3ccakabfbma3rtg-uv7h8ao_cfz3nr4u8qpkbjmueggmrtge2h...@mail.gmail.com%3E

I don't think we should add another utility which does what
WindowDataManager does because

1. Duplicate utilities which try to achieve the same thing.

2. A bigger impact on operators that already work with WindowDataManager.
This will make them backward incompatible as well.

3. IMO creating an abstract RecoverableOperator is not a flexible design.
Idempotency entails higher cost. When it is a pluggable component of the
operator, user has a choice to turn it off by setting NOOP data manager
when they don't care about idempotency.

4. IMO it is always better to enhance/change existing components instead of
adding new ones which are used for the same purpose.

Thanks,

Chandni
On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <[email protected]> wrote:

> Hi All,
> We am planing to add Utility classes in Malhar for providing Write Ahead
> Log capability to the operators.
>
> Motivation:
> Reconstructing state of operator - Some operators keep huge state in
> memory, which causes check-pointing overhead and slows down
> the processing if checkpoints are happening frequently. Such operator can
> benefit by keeping in memory state as transient and allow
> reconstructing of state from the stored tuples. The operator will write
> tuples as they are arriving in the WAL and will process them.
> During recovery of operator the tuples from WAL is read back to reconstruct
> the in-memory state. (The processing of tuples needs to be idempotent).
>
> The operator which maintain their state on file system will also benefit
> from this, as they do not have to update persisted state frequently. They
> could update persistent state after enough data is available in memory and
> apply that data to persistent state. On failure in-memory state
> will be reconstructed from the WAL.
>
> You can think of this functionality similar to the buffer server only
> persisted on the HDFS, and operator can explicitly manage purging.
>
> WAL can also be used to provide implementation for WindowDataManager, which
> keeps information about beginWindow, endWindow markers and information
> about tuples between them and this information will be used to replay the
> tuples in same order. Using WAL will result in fewer files as compare to
> FSWindowDataManager.
>
> General Design
> We will introduce two Interfaces
>
> WALWriter - this will have following methods
>  - append : Append the data at the end of the WAL
>  - getOffet : Return offset which will need to be tracked for recovery.
>  - flush    : make sure that data is persisted on the external storage.
>
> WALReader - This will provide iterator like interface for providing access
> to the WAL.
>  - seek  : seek at a particular offset.
>  - advance : read the entry, returns valid if entry is available
>  - get : get the current entry read by advanced.
>  - getOffset return current offset in the WAL.
>
> The following implementation will be provided which works with DFS
> FileSystems. These
> classes will take a serializer for converting data to byte array before
> writing and
> converting object from byte array while reading.
>
> - FSWALReader implements WALReader
> - FSWALWriter implements WALWriter
>
>
> RollingFSWalReader, RollingFSWALWriter this implementation will support
> rolling files based on
> size of the log. These will internally use FSWALReader and FSWALWriter for
> writing log segments.
>
> WALManager - This interface will provide following method
>  - setup() setup WAL implementation and serializer to use.
>  - runRecovery(Recoverable obj) where recoverable is also an interface
> having just one method recovere(tuple) to recover the tuple which is read
> from the WAL.
>  - setStart(WALPointer start) set the marker, during recovery WAL will
> start reading tuples from this offset. operator will call this method to
> specify that the data before start pointer is not needed.
>
>
> For example the Abstract implementaion of RecoverableOperator can be
> ```java
> public abstract class RecoverableOperator<T> extends BaseOperator
>   implements WAL.Recoverable<T>, Operator.CheckpointListener
> {
>   private WalManager<T> wm;
>
>   public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
>   {
>     @Override
>     public void process(T t)
>     {
>       processTuple(t, false);
>     }
>   };
>
>   public void setup(Context.OperatorContext context)
>   {
>     wm.setup(context, new Serializer<T>());
>     /* build any in-memory state */
>     wm.runRecovery(this);
>   }
>
>   void processTuple(T tuple, boolean recovery) throws IOException
>   {
>     // if this is called as part of normal processing, write it to the WAL.
>     // in case of recovery, don't write tuple again to the WAL.
>     if (!recovery)
>       wm.write(tuple);
>     processTuple(tuple);
>   }
>
>   public void recoveryTuple(T tuple)
>   {
>     processTuple(tuple, true);
>   }
>
>   public void committed(int id) {
>      // update the pointer if before pointer is not needed.
>      wm.setStart(pointer);
>   }
>
>   protected abstract void processTuple(T tuple);
> }
> ```
>
> Let me know about your thought.
>
>  -Tushar.
>

Reply via email to