Hi Tushar, I thought the plan was to change WindowDataManager to use WAL once WAL is added to Malhar. I don't know when did this plan change.
There is already a ticket created for it and I was going to work on it once WAL is moved to Malhar. https://datatorrent.atlassian.net/browse/SPOI-7116 Also a related discussion happened here: http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3ccakabfbma3rtg-uv7h8ao_cfz3nr4u8qpkbjmueggmrtge2h...@mail.gmail.com%3E I don't think we should add another utility which does what WindowDataManager does because 1. Duplicate utilities which try to achieve the same thing. 2. A bigger impact on operators that already work with WindowDataManager. This will make them backward incompatible as well. 3. IMO creating an abstract RecoverableOperator is not a flexible design. Idempotency entails higher cost. When it is a pluggable component of the operator, user has a choice to turn it off by setting NOOP data manager when they don't care about idempotency. 4. IMO it is always better to enhance/change existing components instead of adding new ones which are used for the same purpose. Thanks, Chandni On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <[email protected]> wrote: > Hi All, > We am planing to add Utility classes in Malhar for providing Write Ahead > Log capability to the operators. > > Motivation: > Reconstructing state of operator - Some operators keep huge state in > memory, which causes check-pointing overhead and slows down > the processing if checkpoints are happening frequently. Such operator can > benefit by keeping in memory state as transient and allow > reconstructing of state from the stored tuples. The operator will write > tuples as they are arriving in the WAL and will process them. > During recovery of operator the tuples from WAL is read back to reconstruct > the in-memory state. (The processing of tuples needs to be idempotent). > > The operator which maintain their state on file system will also benefit > from this, as they do not have to update persisted state frequently. They > could update persistent state after enough data is available in memory and > apply that data to persistent state. On failure in-memory state > will be reconstructed from the WAL. > > You can think of this functionality similar to the buffer server only > persisted on the HDFS, and operator can explicitly manage purging. > > WAL can also be used to provide implementation for WindowDataManager, which > keeps information about beginWindow, endWindow markers and information > about tuples between them and this information will be used to replay the > tuples in same order. Using WAL will result in fewer files as compare to > FSWindowDataManager. > > General Design > We will introduce two Interfaces > > WALWriter - this will have following methods > - append : Append the data at the end of the WAL > - getOffet : Return offset which will need to be tracked for recovery. > - flush : make sure that data is persisted on the external storage. > > WALReader - This will provide iterator like interface for providing access > to the WAL. > - seek : seek at a particular offset. > - advance : read the entry, returns valid if entry is available > - get : get the current entry read by advanced. > - getOffset return current offset in the WAL. > > The following implementation will be provided which works with DFS > FileSystems. These > classes will take a serializer for converting data to byte array before > writing and > converting object from byte array while reading. > > - FSWALReader implements WALReader > - FSWALWriter implements WALWriter > > > RollingFSWalReader, RollingFSWALWriter this implementation will support > rolling files based on > size of the log. These will internally use FSWALReader and FSWALWriter for > writing log segments. > > WALManager - This interface will provide following method > - setup() setup WAL implementation and serializer to use. > - runRecovery(Recoverable obj) where recoverable is also an interface > having just one method recovere(tuple) to recover the tuple which is read > from the WAL. > - setStart(WALPointer start) set the marker, during recovery WAL will > start reading tuples from this offset. operator will call this method to > specify that the data before start pointer is not needed. > > > For example the Abstract implementaion of RecoverableOperator can be > ```java > public abstract class RecoverableOperator<T> extends BaseOperator > implements WAL.Recoverable<T>, Operator.CheckpointListener > { > private WalManager<T> wm; > > public transient DefaultInputPort<T> input = new DefaultInputPort<T>() > { > @Override > public void process(T t) > { > processTuple(t, false); > } > }; > > public void setup(Context.OperatorContext context) > { > wm.setup(context, new Serializer<T>()); > /* build any in-memory state */ > wm.runRecovery(this); > } > > void processTuple(T tuple, boolean recovery) throws IOException > { > // if this is called as part of normal processing, write it to the WAL. > // in case of recovery, don't write tuple again to the WAL. > if (!recovery) > wm.write(tuple); > processTuple(tuple); > } > > public void recoveryTuple(T tuple) > { > processTuple(tuple, true); > } > > public void committed(int id) { > // update the pointer if before pointer is not needed. > wm.setStart(pointer); > } > > protected abstract void processTuple(T tuple); > } > ``` > > Let me know about your thought. > > -Tushar. >
