Hi Samza devs and users

I wrote customized Samza S3 consumer which downloads files from S3 and put
messages in BlockedEnvelopeMap. It was straightforward because there's a
nice example, filereader. I tried to a little optimize with
newBlockingQueue() method because I guess that single queue shared could be
fine because Samza container is single threaded. I added the following code:


public S3Consumer(String systemName, Config config, MetricsRegistry
registry) {
        queueSize = config.getInt("systems." + systemName + ".queue.size",
10000);
        bucket = config.get("systems." + systemName + ".bucket");
        prefix = config.get("systems." + systemName + ".prefix");

        queue = new LinkedBlockingQueue<>(queueSize);

        recordCounter = registry.newCounter(this.getClass().getName(),
"processed_records");
    }

@Override
    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return queue; // single queue
    }

Unfortunately, I observed significant message loss with this
implementation. I suspected its queue might have dropped messages, so I
changed newBlockingQueue() implementation same as filereader.

@Override
    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue<>(queueSize);
    }

Then, message loss didn't happen again.

Do you have any idea why it went wrong?

Thank you
Best, Jae

Reply via email to