Hi, Anton, It seems to me that the best option would possibly use the row number as the IncomingMessageEnvelope's offset. Then, when Samza commits the checkpoint, it will commit the row number as the offset. When the Samza job restarts, the row number would be read from the checkpoint topic and you can use it to initialize the customized SystemConsumer to start polling the database again. Thoughts?
-Yi On Thu, Dec 10, 2015 at 4:45 AM, Anton Polyakov <polyakov.an...@gmail.com> wrote: > Hi guys > > In my samza setup I need to create a custom SystemConsumer. Basically I > need to source data from some database table but due to Oracle constraints > I can’t use DataBus. > > So I created a consumer who polls database and pushes new changes into > stream by extending BlockingEnvelopMap and calling put() method: > > public class MockTradeSource extends BlockingEnvelopeMap { > //…poll here, remember last read row number > msg = nextMsg(lastRowNumer); > MockTradeSource.this.put(ssp, new IncomingMessageEnvelope(ssp, > null, null, msg)); > } > Obviously this source now has state - it remembers what was last row it > published. > Question is - how can I make it statefull for Samza? I would prefer to > leverage existing checkpointing or similar framework-level thing and have > this Source managed by Yarn > > Of course I could also take this code out of Samza and just push messages > to Kafka running the publisher as external process, but this is not what I > am looking for.