Hi Samza devs and users I wrote customized Samza S3 consumer which downloads files from S3 and put messages in BlockedEnvelopeMap. It was straightforward because there's a nice example, filereader. I tried to a little optimize with newBlockingQueue() method because I guess that single queue shared could be fine because Samza container is single threaded. I added the following code:
public S3Consumer(String systemName, Config config, MetricsRegistry registry) { queueSize = config.getInt("systems." + systemName + ".queue.size", 10000); bucket = config.get("systems." + systemName + ".bucket"); prefix = config.get("systems." + systemName + ".prefix"); queue = new LinkedBlockingQueue<>(queueSize); recordCounter = registry.newCounter(this.getClass().getName(), "processed_records"); } @Override protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { return queue; // single queue } Unfortunately, I observed significant message loss with this implementation. I suspected its queue might have dropped messages, so I changed newBlockingQueue() implementation same as filereader. @Override protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { return new LinkedBlockingQueue<>(queueSize); } Then, message loss didn't happen again. Do you have any idea why it went wrong? Thank you Best, Jae