This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git
The following commit(s) were added to refs/heads/main by this push:
new d6aa4d0 [mqtt-to-kafka] Migrate akka dependencies to pekko (#22)
d6aa4d0 is described below
commit d6aa4d01b17fcb7d40037d5b8d750a7f7d5f8f8d
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 06:44:36 2023 +0800
[mqtt-to-kafka] Migrate akka dependencies to pekko (#22)
* [mqtt-to-kafka] Migrate akka dependencies to pekko
* use DockerImage
* version numbers
* Update Dependencies.scala
---------
Co-authored-by: laglang <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
pekko-connectors-sample-mqtt-to-kafka/build.sbt | 2 +-
.../project/Dependencies.scala | 27 +-
.../src/main/java/samples/javadsl/Main.java | 424 +++++++++++----------
.../src/main/resources/application.conf | 6 +-
.../src/main/resources/logback.xml | 2 +-
.../src/main/scala/samples/scaladsl/Main.scala | 52 +--
6 files changed, 257 insertions(+), 256 deletions(-)
diff --git a/pekko-connectors-sample-mqtt-to-kafka/build.sbt
b/pekko-connectors-sample-mqtt-to-kafka/build.sbt
index d5eb377..49f241e 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/build.sbt
+++ b/pekko-connectors-sample-mqtt-to-kafka/build.sbt
@@ -1,4 +1,4 @@
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
version := "1.0.0"
scalaVersion := Dependencies.scalaVer
libraryDependencies ++= Dependencies.dependencies
diff --git a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
index f25ba6c..fc8897d 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
@@ -3,29 +3,28 @@ import sbt._
object Dependencies {
val scalaVer = "2.13.12"
// #deps
- val PekkoVersion = "2.6.19"
- val PekkoConnectorsVersion = "4.0.0"
- val PekkoConnectorsKafkaVersion = "3.0.1"
- val JacksonDatabindVersion = "2.11.4"
-
+ val PekkoVersion = "1.0.2"
+ val PekkoConnectorsVersion = "1.0.1"
+ val PekkoConnectorsKafkaVersion = "1.0.0"
+ val JacksonDatabindVersion = "2.14.3"
// #deps
val dependencies = List(
- // #deps
- "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" %
PekkoConnectorsVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
+ // #deps
+ "org.apache.pekko" %% "pekko-connectors-mqtt" % PekkoConnectorsVersion,
+ "org.apache.pekko" %% "pekko-connectors-kafka" %
PekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.4",
// JSON
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" %
JacksonDatabindVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" %
JacksonDatabindVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" %
JacksonDatabindVersion,
// Logging
- "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13",
- // #deps
- "org.testcontainers" % "kafka" % "1.14.1"
+ "org.testcontainers" % "kafka" % "1.17.6"
+ // #deps
)
}
diff --git
a/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
b/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
index 35c4ef0..c7827f3 100644
---
a/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
+++
b/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
@@ -4,32 +4,8 @@
package samples.javadsl;
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.japi.Pair;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.ProducerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Consumer;
-import akka.kafka.javadsl.Producer;
-import akka.stream.KillSwitch;
-import akka.stream.KillSwitches;
-import akka.stream.UniqueKillSwitch;
-import akka.stream.alpakka.mqtt.MqttConnectionSettings;
-import akka.stream.alpakka.mqtt.MqttMessage;
-import akka.stream.alpakka.mqtt.MqttQoS;
-import akka.stream.alpakka.mqtt.MqttSubscriptions;
-import akka.stream.alpakka.mqtt.javadsl.MqttSink;
-import akka.stream.alpakka.mqtt.javadsl.MqttSource;
-import akka.stream.javadsl.Keep;
-import akka.stream.javadsl.RestartSource;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
-import akka.util.ByteString;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
@@ -42,10 +18,37 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.event.Logging;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.japi.Creator;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.KillSwitch;
+import org.apache.pekko.stream.KillSwitches;
+import org.apache.pekko.stream.UniqueKillSwitch;
+import org.apache.pekko.stream.connectors.mqtt.MqttConnectionSettings;
+import org.apache.pekko.stream.connectors.mqtt.MqttMessage;
+import org.apache.pekko.stream.connectors.mqtt.MqttQoS;
+import org.apache.pekko.stream.connectors.mqtt.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqtt.javadsl.MqttSink;
+import org.apache.pekko.stream.connectors.mqtt.javadsl.MqttSource;
+import org.apache.pekko.stream.javadsl.Keep;
+import org.apache.pekko.stream.javadsl.RestartSource;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.ByteString;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.io.StringWriter;
@@ -56,198 +59,197 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import static akka.actor.typed.javadsl.Adapter.toClassic;
-
public class Main {
- public static void main(String[] args) throws Exception {
- KafkaContainer kafkaContainer = new KafkaContainer("5.4.1");
- kafkaContainer.start();
- try {
- Main me = new Main();
- me.run(kafkaContainer.getBootstrapServers());
- } finally {
- kafkaContainer.stop();
- }
+ public static void main(String[] args) throws Exception {
+ KafkaContainer kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2"));
+ kafkaContainer.start();
+ try {
+ Main me = new Main();
+ me.run(kafkaContainer.getBootstrapServers());
+ } finally {
+ kafkaContainer.stop();
}
+ }
- final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(),
"MqttToKafka");
- final Logger log = LoggerFactory.getLogger(Main.class);
-
- // #json-mechanics
+ final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(),
"MqttToKafka");
+ final Logger log = LoggerFactory.getLogger(Main.class);
- /**
- * Data elements sent via MQTT broker.
- */
- public static final class Measurement {
- public final Instant timestamp;
- public final long level;
-
- @JsonCreator
- public Measurement(
- @JsonProperty("timestamp") Instant timestamp,
@JsonProperty("level") long level) {
- this.timestamp = timestamp;
- this.level = level;
- }
- }
+ // #json-mechanics
- private final JsonFactory jsonFactory = new JsonFactory();
+ /**
+ * Data elements sent via MQTT broker.
+ */
+ public static final class Measurement {
- final ObjectMapper mapper = new ObjectMapper().registerModule(new
JavaTimeModule());
- final ObjectReader measurementReader = mapper.readerFor(Measurement.class);
- final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class);
-
- private String asJsonArray(String fieldName, List<Object> list) throws
IOException {
- StringWriter sw = new StringWriter();
- JsonGenerator generator = jsonFactory.createGenerator(sw);
- generator.writeStartObject();
- generator.writeFieldName(fieldName);
- measurementWriter.writeValues(generator).init(true).writeAll(list);
- generator.close();
- return sw.toString();
- }
- // #json-mechanics
-
- // #restarting
-
- /**
- * Wrap a source with restart logic and exposes an equivalent materialized
value.
- */
- <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
- Creator<Source<M, CompletionStage<Done>>> source) {
- // makes use of the fact that these sources materialize a
CompletionStage<Done>
- CompletableFuture<Done> fut = new CompletableFuture<>();
- return RestartSource.withBackoff(
- Duration.ofMillis(100),
- Duration.ofSeconds(3),
- 0.2d, // randomFactor
- 5, // maxRestarts,
- () ->
- source
- .create()
- .mapMaterializedValue(
- mat ->
- mat.handle(
- (done, exception) -> {
- if (done != null) {
-
fut.complete(done);
- } else {
-
fut.completeExceptionally(exception);
- }
- return
fut.toCompletableFuture();
- })))
- .mapMaterializedValue(ignore -> fut.toCompletableFuture());
- }
- // #restarting
-
- void run(String kafkaServer) throws Exception {
- final LoggingAdapter logAdapter =
Logging.getLogger(system.classicSystem(), getClass().getName());
- // #flow
- final MqttConnectionSettings connectionSettings =
- MqttConnectionSettings.create(
- "tcp://localhost:1883", // (1)
- "coffee-client",
- new MemoryPersistence());
-
- final String topic = "coffee/level";
-
- MqttSubscriptions subscriptions = MqttSubscriptions.create(topic,
MqttQoS.atLeastOnce()); // (2)
-
- Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
- wrapWithAsRestartSource( // (3)
- () ->
- MqttSource.atMostOnce(
-
connectionSettings.withClientId("coffee-control"), subscriptions, 8));
-
- // Set up Kafka producer sink
- ProducerSettings<String, String> producerSettings =
- ProducerSettings
- .create(toClassic(system), new StringSerializer(), new
StringSerializer())
- .withBootstrapServers(kafkaServer);
- Sink<ProducerRecord<String, String>, CompletionStage<Done>>
kafkaProducer =
- Producer.plainSink(producerSettings);
- String kafkaTopic = "measurements";
-
- Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>,
CompletionStage<Done>> completions =
- restartingMqttSource
- .viaMat(KillSwitches.single(), Keep.both()) // (4)
- .map(m -> m.payload().utf8String()) // (5)
- .map(measurementReader::readValue) // (6)
- .groupedWithin(50, Duration.ofSeconds(5)) // (7)
- .map(list -> asJsonArray("measurements", list)) // (8)
- .log("producing to Kafka", logAdapter)
- .map(json -> new ProducerRecord<>(kafkaTopic, "",
json)) // (9)
- .toMat(kafkaProducer, Keep.both()) // (10)
- .run(system);
- // #flow
-
- // start producing messages to MQTT
- CompletionStage<Done> subscriptionInitialized =
completions.first().first();
- CompletionStage<UniqueKillSwitch> producer =
- subscriptionInitialized.thenApply(
- d -> produceMessages(measurementWriter,
connectionSettings, topic));
-
- KillSwitch listener = completions.first().second();
-
- CompletionStage<Done> streamCompletion = completions.second();
- streamCompletion
- .handle(
- (done, exception) -> {
- if (exception != null) {
- exception.printStackTrace();
- return null;
- } else {
- return done;
- }
- })
- .thenRun(system::terminate);
-
- // read the messages from the Kafka topic
- Consumer.Control consumerControl =
- Consumer
- .plainSource(
- ConsumerSettings.create(toClassic(system), new
StringDeserializer(), new StringDeserializer())
-
.withBootstrapServers(kafkaServer).withGroupId("sample"),
- Subscriptions.topics(kafkaTopic)
- )
- .map(ConsumerRecord::value)
- .log("read from Kafka", logAdapter)
- .toMat(Sink.ignore(), Keep.left())
- .run(system);
-
- log.info("Letting things run for a while");
- Thread.sleep(20 * 1000);
-
- producer.thenAccept(UniqueKillSwitch::shutdown);
- consumerControl.shutdown();
- listener.shutdown();
- }
+ public final Instant timestamp;
+ public final long level;
- /**
- * Simulate messages from MQTT by writing to topic registered in MQTT
broker.
- */
- private UniqueKillSwitch produceMessages(
- ObjectWriter measurementWriter, MqttConnectionSettings
connectionSettings, String topic) {
- List<Measurement> input =
- Arrays.asList(
- new Measurement(Instant.now(), 40),
- new Measurement(Instant.now(), 60),
- new Measurement(Instant.now(), 80),
- new Measurement(Instant.now(), 100),
- new Measurement(Instant.now(), 120));
-
- MqttConnectionSettings sinkSettings =
connectionSettings.withClientId("coffee-supervisor");
-
- final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
- MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
- UniqueKillSwitch killSwitch =
- Source.cycle(() -> input.iterator())
- .throttle(4, Duration.ofSeconds(1))
- .map(measurementWriter::writeValueAsString)
- .map(s -> MqttMessage.create(topic,
ByteString.fromString(s)))
- .viaMat(KillSwitches.single(), Keep.right())
- .toMat(mqttSink, Keep.left())
- .run(system);
- return killSwitch;
+ @JsonCreator
+ public Measurement(
+ @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level")
long level) {
+ this.timestamp = timestamp;
+ this.level = level;
}
+ }
+
+ private final JsonFactory jsonFactory = new JsonFactory();
+
+ final ObjectMapper mapper = new ObjectMapper().registerModule(new
JavaTimeModule());
+ final ObjectReader measurementReader = mapper.readerFor(Measurement.class);
+ final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class);
+
+ private String asJsonArray(String fieldName, List<Object> list) throws
IOException {
+ StringWriter sw = new StringWriter();
+ JsonGenerator generator = jsonFactory.createGenerator(sw);
+ generator.writeStartObject();
+ generator.writeFieldName(fieldName);
+ measurementWriter.writeValues(generator).init(true).writeAll(list);
+ generator.close();
+ return sw.toString();
+ }
+ // #json-mechanics
+
+ // #restarting
+
+ /**
+ * Wrap a source with restart logic and exposes an equivalent materialized
value.
+ */
+ <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
+ Creator<Source<M, CompletionStage<Done>>> source) {
+ // makes use of the fact that these sources materialize a
CompletionStage<Done>
+ CompletableFuture<Done> fut = new CompletableFuture<>();
+ return RestartSource.withBackoff(
+ Duration.ofMillis(100),
+ Duration.ofSeconds(3),
+ 0.2d, // randomFactor
+ 5, // maxRestarts,
+ () ->
+ source
+ .create()
+ .mapMaterializedValue(
+ mat ->
+ mat.handle(
+ (done, exception) -> {
+ if (done != null) {
+ fut.complete(done);
+ } else {
+ fut.completeExceptionally(exception);
+ }
+ return fut.toCompletableFuture();
+ })))
+ .mapMaterializedValue(ignore -> fut.toCompletableFuture());
+ }
+ // #restarting
+
+ void run(String kafkaServer) throws Exception {
+ final LoggingAdapter logAdapter =
Logging.getLogger(system.classicSystem(), getClass().getName());
+ // #flow
+ final MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", // (1)
+ "coffee-client",
+ new MemoryPersistence());
+
+ final String topic = "coffee/level";
+
+ MqttSubscriptions subscriptions = MqttSubscriptions.create(topic,
MqttQoS.atLeastOnce()); // (2)
+
+ Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
+ wrapWithAsRestartSource( // (3)
+ () ->
+ MqttSource.atMostOnce(
+ connectionSettings.withClientId("coffee-control"),
subscriptions, 8));
+
+ // Set up Kafka producer sink
+ ProducerSettings<String, String> producerSettings =
+ ProducerSettings
+ .create(toClassic(system), new StringSerializer(), new
StringSerializer())
+ .withBootstrapServers(kafkaServer);
+ Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer =
+ Producer.plainSink(producerSettings);
+ String kafkaTopic = "measurements";
+
+ Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>>
completions =
+ restartingMqttSource
+ .viaMat(KillSwitches.single(), Keep.both()) // (4)
+ .map(m -> m.payload().utf8String()) // (5)
+ .map(measurementReader::readValue) // (6)
+ .groupedWithin(50, Duration.ofSeconds(5)) // (7)
+ .map(list -> asJsonArray("measurements", list)) // (8)
+ .log("producing to Kafka", logAdapter)
+ .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9)
+ .toMat(kafkaProducer, Keep.both()) // (10)
+ .run(system);
+ // #flow
+
+ // start producing messages to MQTT
+ CompletionStage<Done> subscriptionInitialized =
completions.first().first();
+ CompletionStage<UniqueKillSwitch> producer =
+ subscriptionInitialized.thenApply(
+ d -> produceMessages(measurementWriter, connectionSettings,
topic));
+
+ KillSwitch listener = completions.first().second();
+
+ CompletionStage<Done> streamCompletion = completions.second();
+ streamCompletion
+ .handle(
+ (done, exception) -> {
+ if (exception != null) {
+ exception.printStackTrace();
+ return null;
+ } else {
+ return done;
+ }
+ })
+ .thenRun(system::terminate);
+
+ // read the messages from the Kafka topic
+ Consumer.Control consumerControl =
+ Consumer
+ .plainSource(
+ ConsumerSettings.create(toClassic(system), new
StringDeserializer(), new StringDeserializer())
+ .withBootstrapServers(kafkaServer).withGroupId("sample"),
+ Subscriptions.topics(kafkaTopic)
+ )
+ .map(ConsumerRecord::value)
+ .log("read from Kafka", logAdapter)
+ .toMat(Sink.ignore(), Keep.left())
+ .run(system);
+
+ log.info("Letting things run for a while");
+ Thread.sleep(20 * 1000);
+
+ producer.thenAccept(UniqueKillSwitch::shutdown);
+ consumerControl.shutdown();
+ listener.shutdown();
+ }
+
+ /**
+ * Simulate messages from MQTT by writing to topic registered in MQTT broker.
+ */
+ private UniqueKillSwitch produceMessages(
+ ObjectWriter measurementWriter, MqttConnectionSettings
connectionSettings, String topic) {
+ List<Measurement> input =
+ Arrays.asList(
+ new Measurement(Instant.now(), 40),
+ new Measurement(Instant.now(), 60),
+ new Measurement(Instant.now(), 80),
+ new Measurement(Instant.now(), 100),
+ new Measurement(Instant.now(), 120));
+
+ MqttConnectionSettings sinkSettings =
connectionSettings.withClientId("coffee-supervisor");
+
+ final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+ MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
+ UniqueKillSwitch killSwitch =
+ Source.cycle(() -> input.iterator())
+ .throttle(4, Duration.ofSeconds(1))
+ .map(measurementWriter::writeValueAsString)
+ .map(s -> MqttMessage.create(topic, ByteString.fromString(s)))
+ .viaMat(KillSwitches.single(), Keep.right())
+ .toMat(mqttSink, Keep.left())
+ .run(system);
+ return killSwitch;
+ }
}
diff --git
a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
+++ b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
loglevel = "DEBUG"
}
diff --git
a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
index ee6913c..42e49aa 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
+++ b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
@@ -10,7 +10,7 @@
<logger name="samples" level="DEBUG"/>
<logger name="org.apache" level="WARN"/>
<logger name="kafka" level="WARN"/>
- <logger name="akka" level="WARN"/>
+ <logger name="org.apache.pekko" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="com.github.dockerjava" level="INFO"/>
diff --git
a/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
b/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
index 80a0e2f..0b5d678 100644
---
a/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
+++
b/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
@@ -3,37 +3,37 @@
*/
package samples.scaladsl
-import java.io.StringWriter
-import java.time.Instant
-
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.adapter._
-import akka.event.{Logging, LoggingAdapter}
-import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
-import akka.kafka.scaladsl.{Consumer, Producer}
-import akka.stream.alpakka.mqtt.{MqttConnectionSettings, MqttMessage, MqttQoS,
MqttSubscriptions}
-import akka.stream.alpakka.mqtt.scaladsl.{MqttSink, MqttSource}
-import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
-import akka.stream.{KillSwitches, UniqueKillSwitch}
-import akka.util.ByteString
-import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.fasterxml.jackson.core.JsonFactory
-import com.fasterxml.jackson.databind.{ObjectMapper, ObjectWriter}
+import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
+import org.apache.kafka.common.serialization.{ StringDeserializer,
StringSerializer }
+import org.apache.pekko.Done
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.event.{ Logging, LoggingAdapter }
+import org.apache.pekko.kafka.{ ConsumerSettings, ProducerSettings,
Subscriptions }
+import org.apache.pekko.kafka.scaladsl.{ Consumer, Producer }
+import org.apache.pekko.stream.{ KillSwitches, UniqueKillSwitch }
+import org.apache.pekko.stream.connectors.mqtt.{ MqttConnectionSettings,
MqttMessage, MqttQoS, MqttSubscriptions }
+import org.apache.pekko.stream.connectors.mqtt.scaladsl.{ MqttSink, MqttSource
}
+import org.apache.pekko.stream.scaladsl.{ Keep, RestartSource, Sink, Source }
+import org.apache.pekko.util.ByteString
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.slf4j.LoggerFactory
import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.utility.DockerImageName
+import java.io.StringWriter
+import java.time.Instant
+import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._
object Main extends App {
- val kafkaContainer = new KafkaContainer("5.4.1")
+
+ val kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2"))
kafkaContainer.start()
try {
val me = new Main
@@ -99,7 +99,8 @@ class Main {
MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"),
subscriptions, 8))
// Set up Kafka producer sink
- val producerSettings = ProducerSettings(system.toClassic, new
StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer)
+ val producerSettings =
+ ProducerSettings(system.toClassic, new StringSerializer, new
StringSerializer).withBootstrapServers(kafkaServer)
val kafkaProducer = Producer.plainSink(producerSettings)
val kafkaTopic = "measurements"
@@ -128,9 +129,9 @@ class Main {
// read the messages from the Kafka topic
val consumerControl = Consumer
.plainSource(
- ConsumerSettings(system.toClassic, new StringDeserializer, new
StringDeserializer).withBootstrapServers(kafkaServer).withGroupId("sample"),
- Subscriptions.topics(kafkaTopic)
- )
+ ConsumerSettings(system.toClassic, new StringDeserializer, new
StringDeserializer).withBootstrapServers(
+ kafkaServer).withGroupId("sample"),
+ Subscriptions.topics(kafkaTopic))
.map(_.value)
.log("read from Kafka")
.toMat(Sink.ignore)(Keep.left)
@@ -153,8 +154,7 @@ class Main {
Measurement(Instant.now, 60),
Measurement(Instant.now, 80),
Measurement(Instant.now, 100),
- Measurement(Instant.now, 120)
- )
+ Measurement(Instant.now, 120))
val sinkSettings = connectionSettings.withClientId("coffee-supervisor")
val killSwitch = Source
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]