Here is the code I am using currently to start this:
private Control startMatrixConsumer() {
final ConsumerSettings<Long, byte[]> matrixConsumerSettings =
ConsumerSettings
.create(services.actorSystem(), new LongDeserializer(), new
ByteArrayDeserializer())
.withBootstrapServers(services.config().getString("kafka.bootstrapServers"))
.withGroupId("group1") // todo put in the conf ??
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final String topicName = Matrix.class.getSimpleName() + '-' + eventId;
final AutoSubscription subscription = Subscriptions.topics(topicName);
return Consumer.plainSource(MatrixConsumerSettings, subscription)
.named(Matrix.class.getSimpleName() + "-Kafka-Consumer-" + eventId)
.map(data -> {
final Matrix matrix = services.kryoDeserialize(data.value(),
Matrix.class);
log.debug(format("Received %s for event %d from Kafka",
Matrix.class.getSimpleName(), matrix.getEventId()));
return matrix;
})
.filter(Objects::nonNull)
.to(Sink.actorRef(getSelf(), NotUsed.getInstance()))
.run(ActorMaterializer.create(getContext()));
}
On Wednesday, December 6, 2017 at 11:06:26 AM UTC-6, kraythe wrote:
>
> We are using Kafka to store messages that are produced by a node in our
> cluster and to be distributed to all nodes in the cluster and I have it
> mostly working with akka-streams but there is a couple of questions I have
> to tie this up. There are some constraints to this.
>
> First of all the message has to be consumed by every node in the cluster
> but produced by only one node. I understand I can assign each node a group
> id that is probably its node ID which means each node will get the message.
> That sorted. But here are the questions.
>
> The data is extremely transient and fairly large (just under a meg) and
> cannot be compressed further or broken up. If there is a new message on the
> topic the old one is pretty much trash. How can I limit the topic to
> basically just one message currently maximum?
>
> Given that the data is necessary for the node to start, I need to consume
> the latest message on the topic no matter whether I have consumed it before
> and, hopefully without creating a unique group id every time I start the
> server. Is this possible and if so, how can it be done.
>
> Finally, the data is usually on the topic but on occasion it is not there
> and I, ideally, need to be able to check if there is a message there and if
> not ask the producer to create the message. Is this possible?
>
> Thanks a bunch.
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.