This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git

commit f6aff22f025ba7779c93785bfdea850cd3940db3
Author: Scala Steward <[email protected]>
AuthorDate: Sat Jan 17 16:23:13 2026 +0000

    Reformat with scalafmt 3.10.4
    
    Executed command: scalafmt --non-interactive
---
 .../benchmarks/KafkaTransactionFixtureGen.scala    |  5 +---
 .../PekkoConnectorsCommittableSinkFixtures.scala   |  5 +---
 .../ReactiveKafkaTransactionFixtures.scala         |  5 +---
 .../kafka/internal/CommitObservationLogic.scala    |  3 +-
 .../kafka/internal/ConsumerProgressTracking.scala  | 33 +++++++++++-----------
 .../pekko/kafka/internal/KafkaConsumerActor.scala  |  9 +-----
 .../pekko/kafka/internal/MessageBuilder.scala      |  6 +---
 .../pekko/kafka/scaladsl/MetadataClient.scala      | 32 ++++++++++-----------
 .../pekko/kafka/scaladsl/Transactional.scala       |  5 +---
 .../kafka/TransactionsPartitionedSourceSpec.scala  | 10 +++----
 .../pekko/kafka/TransactionsSourceSpec.scala       | 13 ++++-----
 project/ParadoxSettings.scala                      | 15 +++++-----
 project/ProjectSettings.scala                      |  3 +-
 .../testkit/internal/KafkaTestKitChecks.scala      |  5 +---
 .../test/scala/docs/scaladsl/FetchMetadata.scala   |  4 +--
 .../org/apache/pekko/kafka/TransactionsOps.scala   |  6 ++--
 .../kafka/internal/CommitCollectorStageSpec.scala  | 15 ++++++----
 .../kafka/scaladsl/ConnectionCheckerSpec.scala     |  3 +-
 .../pekko/kafka/scaladsl/RebalanceExtSpec.scala    |  3 +-
 .../pekko/kafka/scaladsl/TransactionsSpec.scala    | 21 +++++++-------
 20 files changed, 91 insertions(+), 110 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
index 52451061..51d439bf 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
@@ -22,10 +22,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, 
KafkaConsumer }
 import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig }
 import org.apache.kafka.common.IsolationLevel
 import org.apache.kafka.common.serialization.{
-  ByteArrayDeserializer,
-  ByteArraySerializer,
-  StringDeserializer,
-  StringSerializer
+  ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, 
StringSerializer
 }
 
 import scala.jdk.CollectionConverters._
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala
index 46074109..35dfb8bc 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala
@@ -30,10 +30,7 @@ import com.typesafe.scalalogging.LazyLogging
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.serialization.{
-  ByteArrayDeserializer,
-  ByteArraySerializer,
-  StringDeserializer,
-  StringSerializer
+  ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, 
StringSerializer
 }
 
 import scala.concurrent.duration._
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala
index 4b5be6b6..3cdf2301 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala
@@ -26,10 +26,7 @@ import pekko.kafka.{ ConsumerMessage, ConsumerSettings, 
ProducerSettings, Subscr
 import pekko.stream.scaladsl.{ Flow, Source }
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.{
-  ByteArrayDeserializer,
-  ByteArraySerializer,
-  StringDeserializer,
-  StringSerializer
+  ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, 
StringSerializer
 }
 
 import scala.concurrent.duration.FiniteDuration
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala
index e0293427..a1e2a175 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala
@@ -69,7 +69,8 @@ private[internal] trait CommitObservationLogic { self: 
GraphStageLogic =>
         deferredOffsets = deferredOffsets + (gtp -> committable)
         offsetBatch = offsetBatch.updated(dOffset)
       case Some(dOffsetBatch: CommittableOffsetBatch)
-          if dOffsetBatch.offsets.contains(gtp) && dOffsetBatch.offsets
+          if dOffsetBatch.offsets.contains(gtp) &&
+          dOffsetBatch.offsets
             .get(gtp)
             .head < offset =>
         deferredOffsets = deferredOffsets + (gtp -> committable)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
index 7ca6a03e..591ef3fc 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
@@ -88,22 +88,23 @@ final class ConsumerProgressTrackerImpl extends 
ConsumerProgressTracking {
   }
 
   override def received[K, V](received: ConsumerRecords[K, V]): Unit = {
-    receivedMessagesImpl = receivedMessagesImpl ++ received
-      .partitions()
-      .asScala
-      // only tracks the partitions that are currently assigned, as assignment 
is a synchronous interaction and polls
-      // for an old consumer group epoch will not return (we get to make polls 
for the current generation). Supposing a
-      // revoke completes and then the poll() is received for a previous 
epoch, we drop the records here (partitions
-      // are no longer assigned to the consumer). If instead we get a poll() 
and then a revoke, we only track the
-      // offsets for that short period of time and then they are revoked, so 
that is also safe.
-      .intersect(assignedPartitions)
-      .map(tp => (tp, received.records(tp)))
-      // get the last record, its the largest offset/most recent timestamp
-      .map { case (partition, records) => (partition, 
records.get(records.size() - 1)) }
-      .map {
-        case (partition, record) =>
-          partition -> SafeOffsetAndTimestamp(record.offset(), 
record.timestamp())
-      }
+    receivedMessagesImpl = receivedMessagesImpl ++
+      received
+        .partitions()
+        .asScala
+        // only tracks the partitions that are currently assigned, as 
assignment is a synchronous interaction and polls
+        // for an old consumer group epoch will not return (we get to make 
polls for the current generation). Supposing a
+        // revoke completes and then the poll() is received for a previous 
epoch, we drop the records here (partitions
+        // are no longer assigned to the consumer). If instead we get a poll() 
and then a revoke, we only track the
+        // offsets for that short period of time and then they are revoked, so 
that is also safe.
+        .intersect(assignedPartitions)
+        .map(tp => (tp, received.records(tp)))
+        // get the last record, its the largest offset/most recent timestamp
+        .map { case (partition, records) => (partition, 
records.get(records.size() - 1)) }
+        .map {
+          case (partition, record) =>
+            partition -> SafeOffsetAndTimestamp(record.offset(), 
record.timestamp())
+        }
   }
 
   override def commitRequested(offsets: Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index fa1356a5..b15f6b3d 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -21,14 +21,7 @@ import org.apache.pekko
 import pekko.Done
 import pekko.actor.Status.Failure
 import pekko.actor.{
-  Actor,
-  ActorRef,
-  DeadLetterSuppression,
-  NoSerializationVerificationNeeded,
-  Stash,
-  Status,
-  Terminated,
-  Timers
+  Actor, ActorRef, DeadLetterSuppression, NoSerializationVerificationNeeded, 
Stash, Status, Terminated, Timers
 }
 import pekko.annotation.InternalApi
 import pekko.event.LoggingReceive
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index f985faac..89273b89 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -21,11 +21,7 @@ import pekko.Done
 import pekko.annotation.InternalApi
 import pekko.kafka.ConsumerMessage
 import pekko.kafka.ConsumerMessage.{
-  CommittableMessage,
-  CommittableOffsetMetadata,
-  GroupTopicPartition,
-  TransactionalMessage,
-  _
+  CommittableMessage, CommittableOffsetMetadata, GroupTopicPartition, 
TransactionalMessage, _
 }
 import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
index bb3c049c..a3e18b96 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
@@ -57,23 +57,21 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
     getEndOffsets(Set(partition))
       .map(endOffsets => endOffsets(partition))
 
-  def listTopics(): Future[Map[String, List[PartitionInfo]]] =
-    (consumerActor ? ListTopics)(timeout)
-      .mapTo[Topics]
-      .map(_.response)
-      .flatMap {
-        case Success(res) => Future.successful(res)
-        case Failure(e)   => Future.failed(e)
-      }(ExecutionContext.parasitic)
-
-  def getPartitionsFor(topic: String): Future[List[PartitionInfo]] =
-    (consumerActor ? GetPartitionsFor(topic))(timeout)
-      .mapTo[PartitionsFor]
-      .map(_.response)
-      .flatMap {
-        case Success(res) => Future.successful(res)
-        case Failure(e)   => Future.failed(e)
-      }(ExecutionContext.parasitic)
+  def listTopics(): Future[Map[String, List[PartitionInfo]]] = (consumerActor 
? ListTopics)(timeout)
+    .mapTo[Topics]
+    .map(_.response)
+    .flatMap {
+      case Success(res) => Future.successful(res)
+      case Failure(e)   => Future.failed(e)
+    }(ExecutionContext.parasitic)
+
+  def getPartitionsFor(topic: String): Future[List[PartitionInfo]] = 
(consumerActor ? GetPartitionsFor(topic))(timeout)
+    .mapTo[PartitionsFor]
+    .map(_.response)
+    .flatMap {
+      case Success(res) => Future.successful(res)
+      case Failure(e)   => Future.failed(e)
+    }(ExecutionContext.parasitic)
 
   def getCommittedOffsets(partitions: Set[TopicPartition]): 
Future[Map[TopicPartition, OffsetAndMetadata]] =
     (consumerActor ? GetCommittedOffsets(partitions))(timeout)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
index 22039eee..752ee6a2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
@@ -19,10 +19,7 @@ import pekko.annotation.{ ApiMayChange, InternalApi }
 import pekko.kafka.ConsumerMessage.{ PartitionOffset, TransactionalMessage }
 import pekko.kafka.ProducerMessage._
 import pekko.kafka.internal.{
-  TransactionalProducerStage,
-  TransactionalSource,
-  TransactionalSourceWithOffsetContext,
-  TransactionalSubSource
+  TransactionalProducerStage, TransactionalSource, 
TransactionalSourceWithOffsetContext, TransactionalSubSource
 }
 import pekko.kafka.scaladsl.Consumer.Control
 import pekko.kafka.{ AutoSubscription, ConsumerMessage, ConsumerSettings, 
ProducerSettings, Subscription }
diff --git 
a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala
 
b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala
index e6081dce..22d6d35b 100644
--- 
a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala
+++ 
b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala
@@ -54,7 +54,8 @@ class TransactionsPartitionedSourceSpec
     .withInternalTopicsReplicationFactor(replicationFactor)
 
   "A multi-broker consume-transform-produce cycle" must {
-    "provide consistency when multiple partitioned transactional streams are 
being restarted" in assertAllStagesStopped {
+    "provide consistency when multiple partitioned transactional streams are 
being restarted" in
+    assertAllStagesStopped {
       // It's possible to get into a livelock situation where the 
`restartAfter` interval causes transactions to abort
       // over and over.  This can happen when there are a few partitions left 
to process and they can never be fully
       // processed because we always restart the stream before the transaction 
can be completed successfully.
@@ -74,10 +75,9 @@ class TransactionsPartitionedSourceSpec
       val elements = 100 * 1000 // 100 * 1,000 = 100,000
       val restartAfter = (10 * 1000) / sourcePartitions // (10 * 1,000) / 10 = 
100
 
-      val producers: immutable.Seq[Future[Done]] =
-        (0 until sourcePartitions).map { part =>
-          produce(sourceTopic, range = 1 to elements, partition = part)
-        }
+      val producers: immutable.Seq[Future[Done]] = (0 until 
sourcePartitions).map { part =>
+        produce(sourceTopic, range = 1 to elements, partition = part)
+      }
 
       Await.result(Future.sequence(producers), 4.minute)
 
diff --git 
a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala 
b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala
index 552ace46..d8c02fdf 100644
--- 
a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala
+++ 
b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala
@@ -72,13 +72,12 @@ class TransactionsSourceSpec
       val restartAfter = 10 * 1000
 
       val partitionSize = elements / sourcePartitions
-      val producers: immutable.Seq[Future[Done]] =
-        (0 until sourcePartitions).map { part =>
-          val rangeStart = (part * partitionSize) + 1
-          val rangeEnd = partitionSize * (part + 1)
-          log.info(s"Producing [$rangeStart to $rangeEnd] to partition $part")
-          produce(sourceTopic, rangeStart to rangeEnd, part)
-        }
+      val producers: immutable.Seq[Future[Done]] = (0 until 
sourcePartitions).map { part =>
+        val rangeStart = (part * partitionSize) + 1
+        val rangeEnd = partitionSize * (part + 1)
+        log.info(s"Producing [$rangeStart to $rangeEnd] to partition $part")
+        produce(sourceTopic, rangeStart to rangeEnd, part)
+      }
 
       Await.result(Future.sequence(producers), 1.minute)
 
diff --git a/project/ParadoxSettings.scala b/project/ParadoxSettings.scala
index 08ed511b..ee75aa33 100644
--- a/project/ParadoxSettings.scala
+++ b/project/ParadoxSettings.scala
@@ -10,11 +10,7 @@
 import Versions._
 import com.lightbend.paradox.apidoc.ApidocPlugin.autoImport.apidocRootPackage
 import com.lightbend.paradox.sbt.ParadoxPlugin.autoImport.{
-  paradox,
-  paradoxGroups,
-  paradoxMarkdownToHtml,
-  paradoxProperties,
-  paradoxRoots
+  paradox, paradoxGroups, paradoxMarkdownToHtml, paradoxProperties, 
paradoxRoots
 }
 import org.apache.pekko.PekkoParadoxPlugin.autoImport._
 import sbt._
@@ -38,8 +34,10 @@ object ParadoxSettings {
       "extref.pekko.base_url" -> s"$pekkoDocs/pekko/$pekkoVersionForDocs/%s",
       "scaladoc.org.apache.pekko.base_url" -> 
s"$pekkoAPI/pekko/$pekkoVersionForDocs/",
       "javadoc.org.apache.pekko.base_url" -> 
s"$pekkoAPI/pekko/$pekkoVersionForDocs/",
-      "scaladoc.org.apache.pekko.kafka.base_url" -> 
s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/",
-      "javadoc.org.apache.pekko.kafka.base_url" -> 
s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/",
+      "scaladoc.org.apache.pekko.kafka.base_url" ->
+      s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/",
+      "javadoc.org.apache.pekko.kafka.base_url" ->
+      s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/",
       "javadoc.org.apache.pekko.link_style" -> "direct",
       "extref.pekko-management.base_url" -> 
s"$pekkoDocs/pekko-management/$pekkoManagementVersionForDocs/%s",
       // Kafka
@@ -56,7 +54,8 @@ object ParadoxSettings {
       "scaladoc.com.typesafe.config.base_url" -> 
s"https://lightbend.github.io/config/latest/api/";,
       // Testcontainers
       "testcontainers.version" -> testcontainersVersion,
-      "javadoc.org.testcontainers.containers.base_url" -> 
s"https://www.javadoc.io/doc/org.testcontainers/testcontainers/$testcontainersVersion/";,
+      "javadoc.org.testcontainers.containers.base_url" ->
+      
s"https://www.javadoc.io/doc/org.testcontainers/testcontainers/$testcontainersVersion/";,
       "javadoc.org.testcontainers.containers.link_style" -> "direct"))
 
   val sourceGeneratorSettings = Seq(
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index ff0fe5ff..ec98d672 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -76,7 +76,8 @@ object ProjectSettings extends AutoPlugin {
       "[email protected]",
       
url("https://github.com/apache/pekko-connectors-kafka/graphs/contributors";)),
     startYear := Some(2022),
-    description := "Apache Pekko Kafka Connector is a Reactive Enterprise 
Integration library for Java and Scala, based on Reactive Streams and Apache 
Pekko.",
+    description :=
+      "Apache Pekko Kafka Connector is a Reactive Enterprise Integration 
library for Java and Scala, based on Reactive Streams and Apache Pekko.",
     crossScalaVersions := Seq(Scala213, Scala3),
     scalaVersion := Scala213,
     crossVersion := CrossVersion.binary,
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala
index 93478018..f34e9aaf 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala
@@ -18,10 +18,7 @@ import java.util.Collections
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.clients.admin.{
-  Admin,
-  ConsumerGroupDescription,
-  DescribeClusterResult,
-  DescribeConsumerGroupsOptions
+  Admin, ConsumerGroupDescription, DescribeClusterResult, 
DescribeConsumerGroupsOptions
 }
 import org.slf4j.Logger
 
diff --git a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala 
b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
index 55e9f8b7..436a3cce 100644
--- a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
+++ b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
@@ -120,8 +120,8 @@ class FetchMetadata extends DocsSpecBase with 
TestcontainersKafkaLike with TryVa
 
     val partition = 0
     val tp = new TopicPartition(topic, partition)
-    val topicsFuture: Future[Metadata.EndOffsets] =
-      (consumer ? Metadata.GetEndOffsets(Set(tp))).mapTo[Metadata.EndOffsets]
+    val topicsFuture
+        : Future[Metadata.EndOffsets] = (consumer ? 
Metadata.GetEndOffsets(Set(tp))).mapTo[Metadata.EndOffsets]
 
     val response = topicsFuture.futureValue.response
     (response should be).a(Symbol("success"))
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
index d2c35fd4..8d4ec3c6 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
@@ -93,9 +93,9 @@ trait TransactionsOps extends TestSuite with Matchers {
               .idleTimeout(idleTimeout)
               .map { msg =>
                 ProducerMessage.single(new ProducerRecord[String, 
String](sinkTopic,
-                    msg.record.partition(),
-                    msg.record.key(),
-                    msg.record.value),
+                  msg.record.partition(),
+                  msg.record.key(),
+                  msg.record.value),
                   msg.partitionOffset)
               }
               .via(Transactional.flow(producerSettings, transactionalId))
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
index ec7fc6fd..bca4f25a 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
@@ -190,7 +190,8 @@ class CommitCollectorStageSpec(_system: ActorSystem)
         control.shutdown().futureValue shouldBe Done
       }
 
-      "batch commit all buffered elements if upstream has suddenly completed 
with delayed commits" in assertAllStagesStopped {
+      "batch commit all buffered elements if upstream has suddenly completed 
with delayed commits" in
+      assertAllStagesStopped {
         val (sourceProbe, control, sinkProbe) = streamProbes(settings)
         val committer = new TestBatchCommitter(settings, () => 50.millis)
 
@@ -285,7 +286,8 @@ class CommitCollectorStageSpec(_system: ActorSystem)
         // downstream out of order
         val lastBatch = batches.maxBy(_.offsets.values.last)
 
-        (lastBatch.offsets.values.last shouldBe batch2
+        (lastBatch.offsets.values.last shouldBe
+        batch2
           .asInstanceOf[CommittableOffsetBatch]
           .offsets
           .head
@@ -294,7 +296,8 @@ class CommitCollectorStageSpec(_system: ActorSystem)
 
         control.shutdown().futureValue shouldBe Done
       }
-      "only commit when the next offset is observed in a CommittableOffset 
preceded by a CommittableOffsetBatch" in assertAllStagesStopped {
+      "only commit when the next offset is observed in a CommittableOffset 
preceded by a CommittableOffsetBatch" in
+      assertAllStagesStopped {
         val (sourceProbe, control, sinkProbe, offsetFactory) = 
streamProbesWithOffsetFactory(settings)
         // create a mix of single offsets and batches of 1
         val (batch1, msg2, batch3) =
@@ -320,7 +323,8 @@ class CommitCollectorStageSpec(_system: ActorSystem)
 
         control.shutdown().futureValue shouldBe Done
       }
-      "only commit when the next offset is observed in a 
CommittableOffsetBatch preceded by a CommittableOffset" in 
assertAllStagesStopped {
+      "only commit when the next offset is observed in a 
CommittableOffsetBatch preceded by a CommittableOffset" in
+      assertAllStagesStopped {
         val (sourceProbe, control, sinkProbe, offsetFactory) = 
streamProbesWithOffsetFactory(settings)
         // create a mix of single offsets and batches of 1
         val (msg1, batch2, msg3) =
@@ -340,7 +344,8 @@ class CommitCollectorStageSpec(_system: ActorSystem)
         // downstream out of order
         val lastBatch = batches.maxBy(_.offsets.values.last)
 
-        (lastBatch.offsets.values.last shouldBe batch2
+        (lastBatch.offsets.values.last shouldBe
+        batch2
           .asInstanceOf[CommittableOffsetBatch]
           .offsets
           .head
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
index 4725d418..217a7e21 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
@@ -74,7 +74,8 @@ class ConnectionCheckerSpec extends SpecBase with 
TestcontainersKafkaPerClassLik
       Await.ready(control.isShutdown.zip(futDone), failingDetectionTime)
     }
 
-    "fail stream and control.isShutdown when kafka down and not recover during 
max retries exceeded" in assertAllStagesStopped {
+    "fail stream and control.isShutdown when kafka down and not recover during 
max retries exceeded" in
+    assertAllStagesStopped {
       startCluster()
 
       val msg = "hello"
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
index 231aed24..8c4f6fbd 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
@@ -198,7 +198,8 @@ class RebalanceExtSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
 
   "Fetched records" must {
 
-    "no messages should be lost when two consumers consume from one topic and 
two partitions and one consumer aborts mid-stream" in assertAllStagesStopped {
+    "no messages should be lost when two consumers consume from one topic and 
two partitions and one consumer aborts mid-stream" in
+    assertAllStagesStopped {
       val topicCount = 1
       val partitionCount = 2
       val perPartitionMessageCount = 9
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala
index a1d52fec..8a2b11eb 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala
@@ -150,7 +150,8 @@ class TransactionsSpec extends SpecBase with 
TestcontainersKafkaLike with Transa
       }
     }
 
-    "complete with messages filtered out and transient failure causing an 
abort with restartable source" in assertAllStagesStopped {
+    "complete with messages filtered out and transient failure causing an 
abort with restartable source" in
+    assertAllStagesStopped {
       val sourceTopic = createTopic(1)
       val sinkTopic = createTopic(2)
       val group = createGroupId(1)
@@ -314,9 +315,9 @@ class TransactionsSpec extends SpecBase with 
TestcontainersKafkaLike with Transa
                 source
                   .map { msg =>
                     ProducerMessage.single(new ProducerRecord[String, 
String](sinkTopic,
-                        msg.record.partition(),
-                        msg.record.key(),
-                        msg.record.value),
+                      msg.record.partition(),
+                      msg.record.key(),
+                      msg.record.value),
                       msg.partitionOffset)
                   }
                   .runWith(Transactional.sink(producerDefaults, 
transactionalId))
@@ -380,9 +381,9 @@ class TransactionsSpec extends SpecBase with 
TestcontainersKafkaLike with Transa
                       throw new Exception("sub source failure")
                     }
                     ProducerMessage.single(new ProducerRecord[String, 
String](sinkTopic,
-                        msg.record.partition(),
-                        msg.record.key(),
-                        msg.record.value),
+                      msg.record.partition(),
+                      msg.record.key(),
+                      msg.record.value),
                       msg.partitionOffset)
                   }
                   .runWith(Transactional.sink(producerDefaults, 
transactionalId))
@@ -453,9 +454,9 @@ class TransactionsSpec extends SpecBase with 
TestcontainersKafkaLike with Transa
               source
                 .map { msg =>
                   ProducerMessage.single(new ProducerRecord[String, 
String](outTopic,
-                      msg.record.partition(),
-                      msg.record.key(),
-                      msg.record.value() + "-out"),
+                    msg.record.partition(),
+                    msg.record.key(),
+                    msg.record.value() + "-out"),
                     msg.partitionOffset)
                 }
                 .to(Transactional.sink(producerDefaults, transactionalId))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to