Hi guys
In my samza setup I need to create a custom SystemConsumer. Basically I need to
source data from some database table but due to Oracle constraints I can’t use
DataBus.
So I created a consumer who polls database and pushes new changes into stream
by extending BlockingEnvelopMap and calling put() method:
public class MockTradeSource extends BlockingEnvelopeMap {
//…poll here, remember last read row number
msg = nextMsg(lastRowNumer);
MockTradeSource.this.put(ssp, new IncomingMessageEnvelope(ssp,
null, null, msg));
}
Obviously this source now has state - it remembers what was last row it
published.
Question is - how can I make it statefull for Samza? I would prefer to leverage
existing checkpointing or similar framework-level thing and have this Source
managed by Yarn
Of course I could also take this code out of Samza and just push messages to
Kafka running the publisher as external process, but this is not what I am
looking for.