aromanenko-dev commented on code in PR #26051:
URL: https://github.com/apache/beam/pull/26051#discussion_r1172742005
##########
sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java:
##########
@@ -101,47 +102,69 @@ private void receive() {
channel.queueDeclare(
streamName, true, false, false,
Collections.singletonMap("x-queue-type", "stream"));
channel.basicQos(Math.min(MAX_PREFETCH_COUNT, (int)
totalMessagesNumber));
- testConsumer = new TestConsumer(this, channel, this::store);
+ final TestConsumer testConsumer = new TestConsumer(channel, this::store,
isStopped);
channel.basicConsume(
streamName,
false,
- Collections.singletonMap("x-stream-offset", currentOffset),
+ Collections.singletonMap("x-stream-offset", startOffset),
testConsumer);
} catch (Exception e) {
LOG.error("Can not basic consume", e);
throw new RuntimeException(e);
}
+ }
- while (!isStopped()) {
- try {
- TimeUnit.MILLISECONDS.sleep(READ_TIMEOUT_IN_MS);
- } catch (InterruptedException e) {
- LOG.error("Interrupted", e);
+ @Override
+ public void stop(String message) {
+ LOG.info(message);
+ isStopped.set(true);
+ super.stop(message);
+ try {
+ if (recordsProcessed != 0) {
+ LOG.info("Try to multiple ack on {}", recordsProcessed);
+ channel.basicAck(recordsProcessed, true);
}
+ channel.abort();
+ connection.close();
+ LOG.info("RabbitMQ channel and connection were closed");
+ } catch (Exception e) {
+ LOG.error("Exception during stopping of the RabbitMQ receiver", e);
}
+ }
+ @Override
+ public void stop(String message, Throwable error) {
Review Comment:
Seems like the implementation is pretty similar to `stop(String message)`
method above. Can it be optimised?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]