This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git


The following commit(s) were added to refs/heads/main by this push:
     new d6aa4d0  [mqtt-to-kafka] Migrate akka dependencies to pekko (#22)
d6aa4d0 is described below

commit d6aa4d01b17fcb7d40037d5b8d750a7f7d5f8f8d
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 06:44:36 2023 +0800

    [mqtt-to-kafka] Migrate akka dependencies to pekko (#22)
    
    * [mqtt-to-kafka] Migrate akka dependencies to pekko
    
    * use DockerImage
    
    * version numbers
    
    * Update Dependencies.scala
    
    ---------
    
    Co-authored-by: laglang <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 pekko-connectors-sample-mqtt-to-kafka/build.sbt    |   2 +-
 .../project/Dependencies.scala                     |  27 +-
 .../src/main/java/samples/javadsl/Main.java        | 424 +++++++++++----------
 .../src/main/resources/application.conf            |   6 +-
 .../src/main/resources/logback.xml                 |   2 +-
 .../src/main/scala/samples/scaladsl/Main.scala     |  52 +--
 6 files changed, 257 insertions(+), 256 deletions(-)

diff --git a/pekko-connectors-sample-mqtt-to-kafka/build.sbt 
b/pekko-connectors-sample-mqtt-to-kafka/build.sbt
index d5eb377..49f241e 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/build.sbt
+++ b/pekko-connectors-sample-mqtt-to-kafka/build.sbt
@@ -1,4 +1,4 @@
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
 version := "1.0.0"
 scalaVersion := Dependencies.scalaVer
 libraryDependencies ++= Dependencies.dependencies
diff --git a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala 
b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
index f25ba6c..fc8897d 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
@@ -3,29 +3,28 @@ import sbt._
 object Dependencies {
   val scalaVer = "2.13.12"
   // #deps
-  val PekkoVersion = "2.6.19"
-  val PekkoConnectorsVersion = "4.0.0"
-  val PekkoConnectorsKafkaVersion = "3.0.1"
-  val JacksonDatabindVersion = "2.11.4"
-
+  val PekkoVersion = "1.0.2"
+  val PekkoConnectorsVersion = "1.0.1"
+  val PekkoConnectorsKafkaVersion = "1.0.0"
+  val JacksonDatabindVersion = "2.14.3"
   // #deps
 
   val dependencies = List(
-  // #deps
-    "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % 
PekkoConnectorsVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
-    "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
+    // #deps
+    "org.apache.pekko" %% "pekko-connectors-mqtt" % PekkoConnectorsVersion,
+    "org.apache.pekko" %% "pekko-connectors-kafka" % 
PekkoConnectorsKafkaVersion,
+    "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
     "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.4",
     // JSON
     "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % 
JacksonDatabindVersion,
     "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % 
JacksonDatabindVersion,
     "com.fasterxml.jackson.module" %% "jackson-module-scala" % 
JacksonDatabindVersion,
     // Logging
-    "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
     "ch.qos.logback" % "logback-classic" % "1.2.13",
-  // #deps
-    "org.testcontainers" % "kafka" % "1.14.1"
+    "org.testcontainers" % "kafka" % "1.17.6"
+    // #deps
   )
 }
diff --git 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
index 35c4ef0..c7827f3 100644
--- 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
+++ 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/java/samples/javadsl/Main.java
@@ -4,32 +4,8 @@
 
 package samples.javadsl;
 
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.japi.Pair;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.ProducerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Consumer;
-import akka.kafka.javadsl.Producer;
-import akka.stream.KillSwitch;
-import akka.stream.KillSwitches;
-import akka.stream.UniqueKillSwitch;
-import akka.stream.alpakka.mqtt.MqttConnectionSettings;
-import akka.stream.alpakka.mqtt.MqttMessage;
-import akka.stream.alpakka.mqtt.MqttQoS;
-import akka.stream.alpakka.mqtt.MqttSubscriptions;
-import akka.stream.alpakka.mqtt.javadsl.MqttSink;
-import akka.stream.alpakka.mqtt.javadsl.MqttSource;
-import akka.stream.javadsl.Keep;
-import akka.stream.javadsl.RestartSource;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
-import akka.util.ByteString;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonFactory;
@@ -42,10 +18,37 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.event.Logging;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.japi.Creator;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.KillSwitch;
+import org.apache.pekko.stream.KillSwitches;
+import org.apache.pekko.stream.UniqueKillSwitch;
+import org.apache.pekko.stream.connectors.mqtt.MqttConnectionSettings;
+import org.apache.pekko.stream.connectors.mqtt.MqttMessage;
+import org.apache.pekko.stream.connectors.mqtt.MqttQoS;
+import org.apache.pekko.stream.connectors.mqtt.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqtt.javadsl.MqttSink;
+import org.apache.pekko.stream.connectors.mqtt.javadsl.MqttSource;
+import org.apache.pekko.stream.javadsl.Keep;
+import org.apache.pekko.stream.javadsl.RestartSource;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.ByteString;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -56,198 +59,197 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
-import static akka.actor.typed.javadsl.Adapter.toClassic;
-
 public class Main {
 
-    public static void main(String[] args) throws Exception {
-        KafkaContainer kafkaContainer = new KafkaContainer("5.4.1");
-        kafkaContainer.start();
-        try {
-            Main me = new Main();
-            me.run(kafkaContainer.getBootstrapServers());
-        } finally {
-            kafkaContainer.stop();
-        }
+  public static void main(String[] args) throws Exception {
+    KafkaContainer kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2"));
+    kafkaContainer.start();
+    try {
+      Main me = new Main();
+      me.run(kafkaContainer.getBootstrapServers());
+    } finally {
+      kafkaContainer.stop();
     }
+  }
 
-    final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), 
"MqttToKafka");
-    final Logger log = LoggerFactory.getLogger(Main.class);
-
-    // #json-mechanics
+  final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), 
"MqttToKafka");
+  final Logger log = LoggerFactory.getLogger(Main.class);
 
-    /**
-     * Data elements sent via MQTT broker.
-     */
-    public static final class Measurement {
-        public final Instant timestamp;
-        public final long level;
-
-        @JsonCreator
-        public Measurement(
-                @JsonProperty("timestamp") Instant timestamp, 
@JsonProperty("level") long level) {
-            this.timestamp = timestamp;
-            this.level = level;
-        }
-    }
+  // #json-mechanics
 
-    private final JsonFactory jsonFactory = new JsonFactory();
+  /**
+   * Data elements sent via MQTT broker.
+   */
+  public static final class Measurement {
 
-    final ObjectMapper mapper = new ObjectMapper().registerModule(new 
JavaTimeModule());
-    final ObjectReader measurementReader = mapper.readerFor(Measurement.class);
-    final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class);
-
-    private String asJsonArray(String fieldName, List<Object> list) throws 
IOException {
-        StringWriter sw = new StringWriter();
-        JsonGenerator generator = jsonFactory.createGenerator(sw);
-        generator.writeStartObject();
-        generator.writeFieldName(fieldName);
-        measurementWriter.writeValues(generator).init(true).writeAll(list);
-        generator.close();
-        return sw.toString();
-    }
-    // #json-mechanics
-
-    // #restarting
-
-    /**
-     * Wrap a source with restart logic and exposes an equivalent materialized 
value.
-     */
-    <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
-            Creator<Source<M, CompletionStage<Done>>> source) {
-        // makes use of the fact that these sources materialize a 
CompletionStage<Done>
-        CompletableFuture<Done> fut = new CompletableFuture<>();
-        return RestartSource.withBackoff(
-                Duration.ofMillis(100),
-                Duration.ofSeconds(3),
-                0.2d, // randomFactor
-                5, // maxRestarts,
-                () ->
-                        source
-                                .create()
-                                .mapMaterializedValue(
-                                        mat ->
-                                                mat.handle(
-                                                        (done, exception) -> {
-                                                            if (done != null) {
-                                                                
fut.complete(done);
-                                                            } else {
-                                                                
fut.completeExceptionally(exception);
-                                                            }
-                                                            return 
fut.toCompletableFuture();
-                                                        })))
-                .mapMaterializedValue(ignore -> fut.toCompletableFuture());
-    }
-    // #restarting
-
-    void run(String kafkaServer) throws Exception {
-        final LoggingAdapter logAdapter = 
Logging.getLogger(system.classicSystem(), getClass().getName());
-        // #flow
-        final MqttConnectionSettings connectionSettings =
-                MqttConnectionSettings.create(
-                        "tcp://localhost:1883", // (1)
-                        "coffee-client",
-                        new MemoryPersistence());
-
-        final String topic = "coffee/level";
-
-        MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, 
MqttQoS.atLeastOnce()); // (2)
-
-        Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
-                wrapWithAsRestartSource( // (3)
-                        () ->
-                                MqttSource.atMostOnce(
-                                        
connectionSettings.withClientId("coffee-control"), subscriptions, 8));
-
-        // Set up Kafka producer sink
-        ProducerSettings<String, String> producerSettings =
-                ProducerSettings
-                        .create(toClassic(system), new StringSerializer(), new 
StringSerializer())
-                        .withBootstrapServers(kafkaServer);
-        Sink<ProducerRecord<String, String>, CompletionStage<Done>> 
kafkaProducer =
-                Producer.plainSink(producerSettings);
-        String kafkaTopic = "measurements";
-
-        Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, 
CompletionStage<Done>> completions =
-                restartingMqttSource
-                        .viaMat(KillSwitches.single(), Keep.both()) // (4)
-                        .map(m -> m.payload().utf8String()) // (5)
-                        .map(measurementReader::readValue) // (6)
-                        .groupedWithin(50, Duration.ofSeconds(5)) // (7)
-                        .map(list -> asJsonArray("measurements", list)) // (8)
-                        .log("producing to Kafka", logAdapter)
-                        .map(json -> new ProducerRecord<>(kafkaTopic, "", 
json)) // (9)
-                        .toMat(kafkaProducer, Keep.both()) // (10)
-                        .run(system);
-        // #flow
-
-        // start producing messages to MQTT
-        CompletionStage<Done> subscriptionInitialized = 
completions.first().first();
-        CompletionStage<UniqueKillSwitch> producer =
-                subscriptionInitialized.thenApply(
-                        d -> produceMessages(measurementWriter, 
connectionSettings, topic));
-
-        KillSwitch listener = completions.first().second();
-
-        CompletionStage<Done> streamCompletion = completions.second();
-        streamCompletion
-                .handle(
-                        (done, exception) -> {
-                            if (exception != null) {
-                                exception.printStackTrace();
-                                return null;
-                            } else {
-                                return done;
-                            }
-                        })
-                .thenRun(system::terminate);
-
-        // read the messages from the Kafka topic
-        Consumer.Control consumerControl =
-                Consumer
-                        .plainSource(
-                                ConsumerSettings.create(toClassic(system), new 
StringDeserializer(), new StringDeserializer())
-                                        
.withBootstrapServers(kafkaServer).withGroupId("sample"),
-                                Subscriptions.topics(kafkaTopic)
-                        )
-                        .map(ConsumerRecord::value)
-                        .log("read from Kafka", logAdapter)
-                        .toMat(Sink.ignore(), Keep.left())
-                        .run(system);
-
-        log.info("Letting things run for a while");
-        Thread.sleep(20 * 1000);
-
-        producer.thenAccept(UniqueKillSwitch::shutdown);
-        consumerControl.shutdown();
-        listener.shutdown();
-    }
+    public final Instant timestamp;
+    public final long level;
 
-    /**
-     * Simulate messages from MQTT by writing to topic registered in MQTT 
broker.
-     */
-    private UniqueKillSwitch produceMessages(
-            ObjectWriter measurementWriter, MqttConnectionSettings 
connectionSettings, String topic) {
-        List<Measurement> input =
-                Arrays.asList(
-                        new Measurement(Instant.now(), 40),
-                        new Measurement(Instant.now(), 60),
-                        new Measurement(Instant.now(), 80),
-                        new Measurement(Instant.now(), 100),
-                        new Measurement(Instant.now(), 120));
-
-        MqttConnectionSettings sinkSettings = 
connectionSettings.withClientId("coffee-supervisor");
-
-        final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
-                MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
-        UniqueKillSwitch killSwitch =
-                Source.cycle(() -> input.iterator())
-                        .throttle(4, Duration.ofSeconds(1))
-                        .map(measurementWriter::writeValueAsString)
-                        .map(s -> MqttMessage.create(topic, 
ByteString.fromString(s)))
-                        .viaMat(KillSwitches.single(), Keep.right())
-                        .toMat(mqttSink, Keep.left())
-                        .run(system);
-        return killSwitch;
+    @JsonCreator
+    public Measurement(
+        @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") 
long level) {
+      this.timestamp = timestamp;
+      this.level = level;
     }
+  }
+
+  private final JsonFactory jsonFactory = new JsonFactory();
+
+  final ObjectMapper mapper = new ObjectMapper().registerModule(new 
JavaTimeModule());
+  final ObjectReader measurementReader = mapper.readerFor(Measurement.class);
+  final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class);
+
+  private String asJsonArray(String fieldName, List<Object> list) throws 
IOException {
+    StringWriter sw = new StringWriter();
+    JsonGenerator generator = jsonFactory.createGenerator(sw);
+    generator.writeStartObject();
+    generator.writeFieldName(fieldName);
+    measurementWriter.writeValues(generator).init(true).writeAll(list);
+    generator.close();
+    return sw.toString();
+  }
+  // #json-mechanics
+
+  // #restarting
+
+  /**
+   * Wrap a source with restart logic and exposes an equivalent materialized 
value.
+   */
+  <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
+      Creator<Source<M, CompletionStage<Done>>> source) {
+    // makes use of the fact that these sources materialize a 
CompletionStage<Done>
+    CompletableFuture<Done> fut = new CompletableFuture<>();
+    return RestartSource.withBackoff(
+            Duration.ofMillis(100),
+            Duration.ofSeconds(3),
+            0.2d, // randomFactor
+            5, // maxRestarts,
+            () ->
+                source
+                    .create()
+                    .mapMaterializedValue(
+                        mat ->
+                            mat.handle(
+                                (done, exception) -> {
+                                  if (done != null) {
+                                    fut.complete(done);
+                                  } else {
+                                    fut.completeExceptionally(exception);
+                                  }
+                                  return fut.toCompletableFuture();
+                                })))
+        .mapMaterializedValue(ignore -> fut.toCompletableFuture());
+  }
+  // #restarting
+
+  void run(String kafkaServer) throws Exception {
+    final LoggingAdapter logAdapter = 
Logging.getLogger(system.classicSystem(), getClass().getName());
+    // #flow
+    final MqttConnectionSettings connectionSettings =
+        MqttConnectionSettings.create(
+            "tcp://localhost:1883", // (1)
+            "coffee-client",
+            new MemoryPersistence());
+
+    final String topic = "coffee/level";
+
+    MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, 
MqttQoS.atLeastOnce()); // (2)
+
+    Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
+        wrapWithAsRestartSource( // (3)
+            () ->
+                MqttSource.atMostOnce(
+                    connectionSettings.withClientId("coffee-control"), 
subscriptions, 8));
+
+    // Set up Kafka producer sink
+    ProducerSettings<String, String> producerSettings =
+        ProducerSettings
+            .create(toClassic(system), new StringSerializer(), new 
StringSerializer())
+            .withBootstrapServers(kafkaServer);
+    Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer =
+        Producer.plainSink(producerSettings);
+    String kafkaTopic = "measurements";
+
+    Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> 
completions =
+        restartingMqttSource
+            .viaMat(KillSwitches.single(), Keep.both()) // (4)
+            .map(m -> m.payload().utf8String()) // (5)
+            .map(measurementReader::readValue) // (6)
+            .groupedWithin(50, Duration.ofSeconds(5)) // (7)
+            .map(list -> asJsonArray("measurements", list)) // (8)
+            .log("producing to Kafka", logAdapter)
+            .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9)
+            .toMat(kafkaProducer, Keep.both()) // (10)
+            .run(system);
+    // #flow
+
+    // start producing messages to MQTT
+    CompletionStage<Done> subscriptionInitialized = 
completions.first().first();
+    CompletionStage<UniqueKillSwitch> producer =
+        subscriptionInitialized.thenApply(
+            d -> produceMessages(measurementWriter, connectionSettings, 
topic));
+
+    KillSwitch listener = completions.first().second();
+
+    CompletionStage<Done> streamCompletion = completions.second();
+    streamCompletion
+        .handle(
+            (done, exception) -> {
+              if (exception != null) {
+                exception.printStackTrace();
+                return null;
+              } else {
+                return done;
+              }
+            })
+        .thenRun(system::terminate);
+
+    // read the messages from the Kafka topic
+    Consumer.Control consumerControl =
+        Consumer
+            .plainSource(
+                ConsumerSettings.create(toClassic(system), new 
StringDeserializer(), new StringDeserializer())
+                    .withBootstrapServers(kafkaServer).withGroupId("sample"),
+                Subscriptions.topics(kafkaTopic)
+            )
+            .map(ConsumerRecord::value)
+            .log("read from Kafka", logAdapter)
+            .toMat(Sink.ignore(), Keep.left())
+            .run(system);
+
+    log.info("Letting things run for a while");
+    Thread.sleep(20 * 1000);
+
+    producer.thenAccept(UniqueKillSwitch::shutdown);
+    consumerControl.shutdown();
+    listener.shutdown();
+  }
+
+  /**
+   * Simulate messages from MQTT by writing to topic registered in MQTT broker.
+   */
+  private UniqueKillSwitch produceMessages(
+      ObjectWriter measurementWriter, MqttConnectionSettings 
connectionSettings, String topic) {
+    List<Measurement> input =
+        Arrays.asList(
+            new Measurement(Instant.now(), 40),
+            new Measurement(Instant.now(), 60),
+            new Measurement(Instant.now(), 80),
+            new Measurement(Instant.now(), 100),
+            new Measurement(Instant.now(), 120));
+
+    MqttConnectionSettings sinkSettings = 
connectionSettings.withClientId("coffee-supervisor");
+
+    final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+        MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
+    UniqueKillSwitch killSwitch =
+        Source.cycle(() -> input.iterator())
+            .throttle(4, Duration.ofSeconds(1))
+            .map(measurementWriter::writeValueAsString)
+            .map(s -> MqttMessage.create(topic, ByteString.fromString(s)))
+            .viaMat(KillSwitches.single(), Keep.right())
+            .toMat(mqttSink, Keep.left())
+            .run(system);
+    return killSwitch;
+  }
 }
diff --git 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
+++ b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
-  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
   loglevel = "DEBUG"
 }
diff --git 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
index ee6913c..42e49aa 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
+++ b/pekko-connectors-sample-mqtt-to-kafka/src/main/resources/logback.xml
@@ -10,7 +10,7 @@
     <logger name="samples" level="DEBUG"/>
     <logger name="org.apache" level="WARN"/>
     <logger name="kafka" level="WARN"/>
-    <logger name="akka" level="WARN"/>
+    <logger name="org.apache.pekko" level="WARN"/>
     <logger name="org.I0Itec.zkclient" level="WARN"/>
 
     <logger name="com.github.dockerjava" level="INFO"/>
diff --git 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
index 80a0e2f..0b5d678 100644
--- 
a/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
+++ 
b/pekko-connectors-sample-mqtt-to-kafka/src/main/scala/samples/scaladsl/Main.scala
@@ -3,37 +3,37 @@
  */
 package samples.scaladsl
 
-import java.io.StringWriter
-import java.time.Instant
-
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.adapter._
-import akka.event.{Logging, LoggingAdapter}
-import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
-import akka.kafka.scaladsl.{Consumer, Producer}
-import akka.stream.alpakka.mqtt.{MqttConnectionSettings, MqttMessage, MqttQoS, 
MqttSubscriptions}
-import akka.stream.alpakka.mqtt.scaladsl.{MqttSink, MqttSource}
-import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
-import akka.stream.{KillSwitches, UniqueKillSwitch}
-import akka.util.ByteString
-import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
 import com.fasterxml.jackson.core.JsonFactory
-import com.fasterxml.jackson.databind.{ObjectMapper, ObjectWriter}
+import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.common.serialization.{ StringDeserializer, 
StringSerializer }
+import org.apache.pekko.Done
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.event.{ Logging, LoggingAdapter }
+import org.apache.pekko.kafka.{ ConsumerSettings, ProducerSettings, 
Subscriptions }
+import org.apache.pekko.kafka.scaladsl.{ Consumer, Producer }
+import org.apache.pekko.stream.{ KillSwitches, UniqueKillSwitch }
+import org.apache.pekko.stream.connectors.mqtt.{ MqttConnectionSettings, 
MqttMessage, MqttQoS, MqttSubscriptions }
+import org.apache.pekko.stream.connectors.mqtt.scaladsl.{ MqttSink, MqttSource 
}
+import org.apache.pekko.stream.scaladsl.{ Keep, RestartSource, Sink, Source }
+import org.apache.pekko.util.ByteString
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 import org.slf4j.LoggerFactory
 import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.utility.DockerImageName
 
+import java.io.StringWriter
+import java.time.Instant
+import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.jdk.CollectionConverters._
 
 object Main extends App {
-  val kafkaContainer = new KafkaContainer("5.4.1")
+
+  val kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2"))
   kafkaContainer.start()
   try {
     val me = new Main
@@ -99,7 +99,8 @@ class Main {
       MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"), 
subscriptions, 8))
 
     // Set up Kafka producer sink
-    val producerSettings = ProducerSettings(system.toClassic, new 
StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer)
+    val producerSettings =
+      ProducerSettings(system.toClassic, new StringSerializer, new 
StringSerializer).withBootstrapServers(kafkaServer)
     val kafkaProducer = Producer.plainSink(producerSettings)
     val kafkaTopic = "measurements"
 
@@ -128,9 +129,9 @@ class Main {
     // read the messages from the Kafka topic
     val consumerControl = Consumer
       .plainSource(
-        ConsumerSettings(system.toClassic, new StringDeserializer, new 
StringDeserializer).withBootstrapServers(kafkaServer).withGroupId("sample"),
-        Subscriptions.topics(kafkaTopic)
-      )
+        ConsumerSettings(system.toClassic, new StringDeserializer, new 
StringDeserializer).withBootstrapServers(
+          kafkaServer).withGroupId("sample"),
+        Subscriptions.topics(kafkaTopic))
       .map(_.value)
       .log("read from Kafka")
       .toMat(Sink.ignore)(Keep.left)
@@ -153,8 +154,7 @@ class Main {
       Measurement(Instant.now, 60),
       Measurement(Instant.now, 80),
       Measurement(Instant.now, 100),
-      Measurement(Instant.now, 120)
-    )
+      Measurement(Instant.now, 120))
 
     val sinkSettings = connectionSettings.withClientId("coffee-supervisor")
     val killSwitch = Source


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to