This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
commit f6aff22f025ba7779c93785bfdea850cd3940db3 Author: Scala Steward <[email protected]> AuthorDate: Sat Jan 17 16:23:13 2026 +0000 Reformat with scalafmt 3.10.4 Executed command: scalafmt --non-interactive --- .../benchmarks/KafkaTransactionFixtureGen.scala | 5 +--- .../PekkoConnectorsCommittableSinkFixtures.scala | 5 +--- .../ReactiveKafkaTransactionFixtures.scala | 5 +--- .../kafka/internal/CommitObservationLogic.scala | 3 +- .../kafka/internal/ConsumerProgressTracking.scala | 33 +++++++++++----------- .../pekko/kafka/internal/KafkaConsumerActor.scala | 9 +----- .../pekko/kafka/internal/MessageBuilder.scala | 6 +--- .../pekko/kafka/scaladsl/MetadataClient.scala | 32 ++++++++++----------- .../pekko/kafka/scaladsl/Transactional.scala | 5 +--- .../kafka/TransactionsPartitionedSourceSpec.scala | 10 +++---- .../pekko/kafka/TransactionsSourceSpec.scala | 13 ++++----- project/ParadoxSettings.scala | 15 +++++----- project/ProjectSettings.scala | 3 +- .../testkit/internal/KafkaTestKitChecks.scala | 5 +--- .../test/scala/docs/scaladsl/FetchMetadata.scala | 4 +-- .../org/apache/pekko/kafka/TransactionsOps.scala | 6 ++-- .../kafka/internal/CommitCollectorStageSpec.scala | 15 ++++++---- .../kafka/scaladsl/ConnectionCheckerSpec.scala | 3 +- .../pekko/kafka/scaladsl/RebalanceExtSpec.scala | 3 +- .../pekko/kafka/scaladsl/TransactionsSpec.scala | 21 +++++++------- 20 files changed, 91 insertions(+), 110 deletions(-) diff --git a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala index 52451061..51d439bf 100644 --- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala +++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala @@ -22,10 +22,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer } import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig } import org.apache.kafka.common.IsolationLevel import org.apache.kafka.common.serialization.{ - ByteArrayDeserializer, - ByteArraySerializer, - StringDeserializer, - StringSerializer + ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer } import scala.jdk.CollectionConverters._ diff --git a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala index 46074109..35dfb8bc 100644 --- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala +++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala @@ -30,10 +30,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{ - ByteArrayDeserializer, - ByteArraySerializer, - StringDeserializer, - StringSerializer + ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer } import scala.concurrent.duration._ diff --git a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala index 4b5be6b6..3cdf2301 100644 --- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala +++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionFixtures.scala @@ -26,10 +26,7 @@ import pekko.kafka.{ ConsumerMessage, ConsumerSettings, ProducerSettings, Subscr import pekko.stream.scaladsl.{ Flow, Source } import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.{ - ByteArrayDeserializer, - ByteArraySerializer, - StringDeserializer, - StringSerializer + ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer } import scala.concurrent.duration.FiniteDuration diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala index e0293427..a1e2a175 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/CommitObservationLogic.scala @@ -69,7 +69,8 @@ private[internal] trait CommitObservationLogic { self: GraphStageLogic => deferredOffsets = deferredOffsets + (gtp -> committable) offsetBatch = offsetBatch.updated(dOffset) case Some(dOffsetBatch: CommittableOffsetBatch) - if dOffsetBatch.offsets.contains(gtp) && dOffsetBatch.offsets + if dOffsetBatch.offsets.contains(gtp) && + dOffsetBatch.offsets .get(gtp) .head < offset => deferredOffsets = deferredOffsets + (gtp -> committable) diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala index 7ca6a03e..591ef3fc 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala @@ -88,22 +88,23 @@ final class ConsumerProgressTrackerImpl extends ConsumerProgressTracking { } override def received[K, V](received: ConsumerRecords[K, V]): Unit = { - receivedMessagesImpl = receivedMessagesImpl ++ received - .partitions() - .asScala - // only tracks the partitions that are currently assigned, as assignment is a synchronous interaction and polls - // for an old consumer group epoch will not return (we get to make polls for the current generation). Supposing a - // revoke completes and then the poll() is received for a previous epoch, we drop the records here (partitions - // are no longer assigned to the consumer). If instead we get a poll() and then a revoke, we only track the - // offsets for that short period of time and then they are revoked, so that is also safe. - .intersect(assignedPartitions) - .map(tp => (tp, received.records(tp))) - // get the last record, its the largest offset/most recent timestamp - .map { case (partition, records) => (partition, records.get(records.size() - 1)) } - .map { - case (partition, record) => - partition -> SafeOffsetAndTimestamp(record.offset(), record.timestamp()) - } + receivedMessagesImpl = receivedMessagesImpl ++ + received + .partitions() + .asScala + // only tracks the partitions that are currently assigned, as assignment is a synchronous interaction and polls + // for an old consumer group epoch will not return (we get to make polls for the current generation). Supposing a + // revoke completes and then the poll() is received for a previous epoch, we drop the records here (partitions + // are no longer assigned to the consumer). If instead we get a poll() and then a revoke, we only track the + // offsets for that short period of time and then they are revoked, so that is also safe. + .intersect(assignedPartitions) + .map(tp => (tp, received.records(tp))) + // get the last record, its the largest offset/most recent timestamp + .map { case (partition, records) => (partition, records.get(records.size() - 1)) } + .map { + case (partition, record) => + partition -> SafeOffsetAndTimestamp(record.offset(), record.timestamp()) + } } override def commitRequested(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = { diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala index fa1356a5..b15f6b3d 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala @@ -21,14 +21,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.Status.Failure import pekko.actor.{ - Actor, - ActorRef, - DeadLetterSuppression, - NoSerializationVerificationNeeded, - Stash, - Status, - Terminated, - Timers + Actor, ActorRef, DeadLetterSuppression, NoSerializationVerificationNeeded, Stash, Status, Terminated, Timers } import pekko.annotation.InternalApi import pekko.event.LoggingReceive diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala index f985faac..89273b89 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala @@ -21,11 +21,7 @@ import pekko.Done import pekko.annotation.InternalApi import pekko.kafka.ConsumerMessage import pekko.kafka.ConsumerMessage.{ - CommittableMessage, - CommittableOffsetMetadata, - GroupTopicPartition, - TransactionalMessage, - _ + CommittableMessage, CommittableOffsetMetadata, GroupTopicPartition, TransactionalMessage, _ } import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala index bb3c049c..a3e18b96 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala @@ -57,23 +57,21 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed getEndOffsets(Set(partition)) .map(endOffsets => endOffsets(partition)) - def listTopics(): Future[Map[String, List[PartitionInfo]]] = - (consumerActor ? ListTopics)(timeout) - .mapTo[Topics] - .map(_.response) - .flatMap { - case Success(res) => Future.successful(res) - case Failure(e) => Future.failed(e) - }(ExecutionContext.parasitic) - - def getPartitionsFor(topic: String): Future[List[PartitionInfo]] = - (consumerActor ? GetPartitionsFor(topic))(timeout) - .mapTo[PartitionsFor] - .map(_.response) - .flatMap { - case Success(res) => Future.successful(res) - case Failure(e) => Future.failed(e) - }(ExecutionContext.parasitic) + def listTopics(): Future[Map[String, List[PartitionInfo]]] = (consumerActor ? ListTopics)(timeout) + .mapTo[Topics] + .map(_.response) + .flatMap { + case Success(res) => Future.successful(res) + case Failure(e) => Future.failed(e) + }(ExecutionContext.parasitic) + + def getPartitionsFor(topic: String): Future[List[PartitionInfo]] = (consumerActor ? GetPartitionsFor(topic))(timeout) + .mapTo[PartitionsFor] + .map(_.response) + .flatMap { + case Success(res) => Future.successful(res) + case Failure(e) => Future.failed(e) + }(ExecutionContext.parasitic) def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] = (consumerActor ? GetCommittedOffsets(partitions))(timeout) diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala index 22039eee..752ee6a2 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala @@ -19,10 +19,7 @@ import pekko.annotation.{ ApiMayChange, InternalApi } import pekko.kafka.ConsumerMessage.{ PartitionOffset, TransactionalMessage } import pekko.kafka.ProducerMessage._ import pekko.kafka.internal.{ - TransactionalProducerStage, - TransactionalSource, - TransactionalSourceWithOffsetContext, - TransactionalSubSource + TransactionalProducerStage, TransactionalSource, TransactionalSourceWithOffsetContext, TransactionalSubSource } import pekko.kafka.scaladsl.Consumer.Control import pekko.kafka.{ AutoSubscription, ConsumerMessage, ConsumerSettings, ProducerSettings, Subscription } diff --git a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala index e6081dce..22d6d35b 100644 --- a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala +++ b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsPartitionedSourceSpec.scala @@ -54,7 +54,8 @@ class TransactionsPartitionedSourceSpec .withInternalTopicsReplicationFactor(replicationFactor) "A multi-broker consume-transform-produce cycle" must { - "provide consistency when multiple partitioned transactional streams are being restarted" in assertAllStagesStopped { + "provide consistency when multiple partitioned transactional streams are being restarted" in + assertAllStagesStopped { // It's possible to get into a livelock situation where the `restartAfter` interval causes transactions to abort // over and over. This can happen when there are a few partitions left to process and they can never be fully // processed because we always restart the stream before the transaction can be completed successfully. @@ -74,10 +75,9 @@ class TransactionsPartitionedSourceSpec val elements = 100 * 1000 // 100 * 1,000 = 100,000 val restartAfter = (10 * 1000) / sourcePartitions // (10 * 1,000) / 10 = 100 - val producers: immutable.Seq[Future[Done]] = - (0 until sourcePartitions).map { part => - produce(sourceTopic, range = 1 to elements, partition = part) - } + val producers: immutable.Seq[Future[Done]] = (0 until sourcePartitions).map { part => + produce(sourceTopic, range = 1 to elements, partition = part) + } Await.result(Future.sequence(producers), 4.minute) diff --git a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala index 552ace46..d8c02fdf 100644 --- a/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala +++ b/int-tests/src/test/scala/org/apache/pekko/kafka/TransactionsSourceSpec.scala @@ -72,13 +72,12 @@ class TransactionsSourceSpec val restartAfter = 10 * 1000 val partitionSize = elements / sourcePartitions - val producers: immutable.Seq[Future[Done]] = - (0 until sourcePartitions).map { part => - val rangeStart = (part * partitionSize) + 1 - val rangeEnd = partitionSize * (part + 1) - log.info(s"Producing [$rangeStart to $rangeEnd] to partition $part") - produce(sourceTopic, rangeStart to rangeEnd, part) - } + val producers: immutable.Seq[Future[Done]] = (0 until sourcePartitions).map { part => + val rangeStart = (part * partitionSize) + 1 + val rangeEnd = partitionSize * (part + 1) + log.info(s"Producing [$rangeStart to $rangeEnd] to partition $part") + produce(sourceTopic, rangeStart to rangeEnd, part) + } Await.result(Future.sequence(producers), 1.minute) diff --git a/project/ParadoxSettings.scala b/project/ParadoxSettings.scala index 08ed511b..ee75aa33 100644 --- a/project/ParadoxSettings.scala +++ b/project/ParadoxSettings.scala @@ -10,11 +10,7 @@ import Versions._ import com.lightbend.paradox.apidoc.ApidocPlugin.autoImport.apidocRootPackage import com.lightbend.paradox.sbt.ParadoxPlugin.autoImport.{ - paradox, - paradoxGroups, - paradoxMarkdownToHtml, - paradoxProperties, - paradoxRoots + paradox, paradoxGroups, paradoxMarkdownToHtml, paradoxProperties, paradoxRoots } import org.apache.pekko.PekkoParadoxPlugin.autoImport._ import sbt._ @@ -38,8 +34,10 @@ object ParadoxSettings { "extref.pekko.base_url" -> s"$pekkoDocs/pekko/$pekkoVersionForDocs/%s", "scaladoc.org.apache.pekko.base_url" -> s"$pekkoAPI/pekko/$pekkoVersionForDocs/", "javadoc.org.apache.pekko.base_url" -> s"$pekkoAPI/pekko/$pekkoVersionForDocs/", - "scaladoc.org.apache.pekko.kafka.base_url" -> s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/", - "javadoc.org.apache.pekko.kafka.base_url" -> s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/", + "scaladoc.org.apache.pekko.kafka.base_url" -> + s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/", + "javadoc.org.apache.pekko.kafka.base_url" -> + s"$pekkoAPI/pekko-connectors-kafka/$pekkoConnectorsKafkaVersionForDocs/", "javadoc.org.apache.pekko.link_style" -> "direct", "extref.pekko-management.base_url" -> s"$pekkoDocs/pekko-management/$pekkoManagementVersionForDocs/%s", // Kafka @@ -56,7 +54,8 @@ object ParadoxSettings { "scaladoc.com.typesafe.config.base_url" -> s"https://lightbend.github.io/config/latest/api/", // Testcontainers "testcontainers.version" -> testcontainersVersion, - "javadoc.org.testcontainers.containers.base_url" -> s"https://www.javadoc.io/doc/org.testcontainers/testcontainers/$testcontainersVersion/", + "javadoc.org.testcontainers.containers.base_url" -> + s"https://www.javadoc.io/doc/org.testcontainers/testcontainers/$testcontainersVersion/", "javadoc.org.testcontainers.containers.link_style" -> "direct")) val sourceGeneratorSettings = Seq( diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala index ff0fe5ff..ec98d672 100644 --- a/project/ProjectSettings.scala +++ b/project/ProjectSettings.scala @@ -76,7 +76,8 @@ object ProjectSettings extends AutoPlugin { "[email protected]", url("https://github.com/apache/pekko-connectors-kafka/graphs/contributors")), startYear := Some(2022), - description := "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.", + description := + "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.", crossScalaVersions := Seq(Scala213, Scala3), scalaVersion := Scala213, crossVersion := CrossVersion.binary, diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala index 93478018..f34e9aaf 100644 --- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala +++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKitChecks.scala @@ -18,10 +18,7 @@ import java.util.Collections import java.util.concurrent.TimeUnit import org.apache.kafka.clients.admin.{ - Admin, - ConsumerGroupDescription, - DescribeClusterResult, - DescribeConsumerGroupsOptions + Admin, ConsumerGroupDescription, DescribeClusterResult, DescribeConsumerGroupsOptions } import org.slf4j.Logger diff --git a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala index 55e9f8b7..436a3cce 100644 --- a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala +++ b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala @@ -120,8 +120,8 @@ class FetchMetadata extends DocsSpecBase with TestcontainersKafkaLike with TryVa val partition = 0 val tp = new TopicPartition(topic, partition) - val topicsFuture: Future[Metadata.EndOffsets] = - (consumer ? Metadata.GetEndOffsets(Set(tp))).mapTo[Metadata.EndOffsets] + val topicsFuture + : Future[Metadata.EndOffsets] = (consumer ? Metadata.GetEndOffsets(Set(tp))).mapTo[Metadata.EndOffsets] val response = topicsFuture.futureValue.response (response should be).a(Symbol("success")) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala index d2c35fd4..8d4ec3c6 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala @@ -93,9 +93,9 @@ trait TransactionsOps extends TestSuite with Matchers { .idleTimeout(idleTimeout) .map { msg => ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, - msg.record.partition(), - msg.record.key(), - msg.record.value), + msg.record.partition(), + msg.record.key(), + msg.record.value), msg.partitionOffset) } .via(Transactional.flow(producerSettings, transactionalId)) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala index ec7fc6fd..bca4f25a 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala @@ -190,7 +190,8 @@ class CommitCollectorStageSpec(_system: ActorSystem) control.shutdown().futureValue shouldBe Done } - "batch commit all buffered elements if upstream has suddenly completed with delayed commits" in assertAllStagesStopped { + "batch commit all buffered elements if upstream has suddenly completed with delayed commits" in + assertAllStagesStopped { val (sourceProbe, control, sinkProbe) = streamProbes(settings) val committer = new TestBatchCommitter(settings, () => 50.millis) @@ -285,7 +286,8 @@ class CommitCollectorStageSpec(_system: ActorSystem) // downstream out of order val lastBatch = batches.maxBy(_.offsets.values.last) - (lastBatch.offsets.values.last shouldBe batch2 + (lastBatch.offsets.values.last shouldBe + batch2 .asInstanceOf[CommittableOffsetBatch] .offsets .head @@ -294,7 +296,8 @@ class CommitCollectorStageSpec(_system: ActorSystem) control.shutdown().futureValue shouldBe Done } - "only commit when the next offset is observed in a CommittableOffset preceded by a CommittableOffsetBatch" in assertAllStagesStopped { + "only commit when the next offset is observed in a CommittableOffset preceded by a CommittableOffsetBatch" in + assertAllStagesStopped { val (sourceProbe, control, sinkProbe, offsetFactory) = streamProbesWithOffsetFactory(settings) // create a mix of single offsets and batches of 1 val (batch1, msg2, batch3) = @@ -320,7 +323,8 @@ class CommitCollectorStageSpec(_system: ActorSystem) control.shutdown().futureValue shouldBe Done } - "only commit when the next offset is observed in a CommittableOffsetBatch preceded by a CommittableOffset" in assertAllStagesStopped { + "only commit when the next offset is observed in a CommittableOffsetBatch preceded by a CommittableOffset" in + assertAllStagesStopped { val (sourceProbe, control, sinkProbe, offsetFactory) = streamProbesWithOffsetFactory(settings) // create a mix of single offsets and batches of 1 val (msg1, batch2, msg3) = @@ -340,7 +344,8 @@ class CommitCollectorStageSpec(_system: ActorSystem) // downstream out of order val lastBatch = batches.maxBy(_.offsets.values.last) - (lastBatch.offsets.values.last shouldBe batch2 + (lastBatch.offsets.values.last shouldBe + batch2 .asInstanceOf[CommittableOffsetBatch] .offsets .head diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala index 4725d418..217a7e21 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala @@ -74,7 +74,8 @@ class ConnectionCheckerSpec extends SpecBase with TestcontainersKafkaPerClassLik Await.ready(control.isShutdown.zip(futDone), failingDetectionTime) } - "fail stream and control.isShutdown when kafka down and not recover during max retries exceeded" in assertAllStagesStopped { + "fail stream and control.isShutdown when kafka down and not recover during max retries exceeded" in + assertAllStagesStopped { startCluster() val msg = "hello" diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala index 231aed24..8c4f6fbd 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala @@ -198,7 +198,8 @@ class RebalanceExtSpec extends SpecBase with TestcontainersKafkaLike with Inside "Fetched records" must { - "no messages should be lost when two consumers consume from one topic and two partitions and one consumer aborts mid-stream" in assertAllStagesStopped { + "no messages should be lost when two consumers consume from one topic and two partitions and one consumer aborts mid-stream" in + assertAllStagesStopped { val topicCount = 1 val partitionCount = 2 val perPartitionMessageCount = 9 diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala index a1d52fec..8a2b11eb 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TransactionsSpec.scala @@ -150,7 +150,8 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa } } - "complete with messages filtered out and transient failure causing an abort with restartable source" in assertAllStagesStopped { + "complete with messages filtered out and transient failure causing an abort with restartable source" in + assertAllStagesStopped { val sourceTopic = createTopic(1) val sinkTopic = createTopic(2) val group = createGroupId(1) @@ -314,9 +315,9 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa source .map { msg => ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, - msg.record.partition(), - msg.record.key(), - msg.record.value), + msg.record.partition(), + msg.record.key(), + msg.record.value), msg.partitionOffset) } .runWith(Transactional.sink(producerDefaults, transactionalId)) @@ -380,9 +381,9 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa throw new Exception("sub source failure") } ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, - msg.record.partition(), - msg.record.key(), - msg.record.value), + msg.record.partition(), + msg.record.key(), + msg.record.value), msg.partitionOffset) } .runWith(Transactional.sink(producerDefaults, transactionalId)) @@ -453,9 +454,9 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa source .map { msg => ProducerMessage.single(new ProducerRecord[String, String](outTopic, - msg.record.partition(), - msg.record.key(), - msg.record.value() + "-out"), + msg.record.partition(), + msg.record.key(), + msg.record.value() + "-out"), msg.partitionOffset) } .to(Transactional.sink(producerDefaults, transactionalId)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
