This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new c457eda9 support scala 3 compilation (#58)
c457eda9 is described below

commit c457eda9e928da4949dbea6ca4196108c952727d
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 25 21:43:19 2023 +0200

    support scala 3 compilation (#58)
    
    * upgrade scalatest
    
    partial scala 3 support
    
    Update check-build-test.yml
    
    Update check-build-test.yml
    
    Update check-build-test.yml
    
    Update check-build-test.yml
    
    more changes to support scala 3 compile
    
    scalafmt
    
    another scala 3 compile issue
    
    another scala 3 compile issue
    
    more scala 3 compile issues
    
    compile issue
    
    some scala 3 test compile issues
    
    Update ConsumerMock.scala
    
    more scala3 compile issues
    
    compile issues
    
    upgrade scalapb
    
    Update check-build-test.yml
    
    * try to run scala3 compile
    
    Update check-build-test.yml
    
    * scala 3 compile issue
    
    * scala 3 build failing over Wconf setting
    
    * Update ProjectSettings.scala
    
    * Update ProjectSettings.scala
    
    * Update ProjectSettings.scala
---
 .github/workflows/check-build-test.yml             |  5 ++--
 .../org/apache/pekko/kafka/benchmarks/Timed.scala  |  2 +-
 build.sbt                                          |  7 +++---
 .../org/apache/pekko/kafka/ConsumerMessage.scala   |  9 ++++++-
 .../org/apache/pekko/kafka/ProducerMessage.scala   |  8 +++---
 .../internal/CommittingProducerSinkStage.scala     |  8 +++---
 .../kafka/internal/DefaultProducerStage.scala      |  5 +++-
 .../pekko/kafka/internal/DeferredProducer.scala    | 13 ++++++++--
 .../pekko/kafka/internal/KafkaConsumerActor.scala  |  4 ---
 .../pekko/kafka/internal/MessageBuilder.scala      |  2 +-
 .../pekko/kafka/scaladsl/DiscoverySupport.scala    |  8 +++---
 project/Dependencies.scala                         | 16 ++++++------
 project/ParadoxSettings.scala                      |  2 +-
 project/ProjectSettings.scala                      |  8 +++---
 project/Versions.scala                             | 10 +++++++-
 .../docs/scaladsl/ClusterShardingExample.scala     |  6 ++---
 .../test/scala/docs/scaladsl/FetchMetadata.scala   |  7 +++---
 .../src/test/scala/docs/scaladsl/proto/Order.scala |  6 ++++-
 .../internal/CommittingProducerSinkSpec.scala      |  4 +--
 .../kafka/internal/CommittingWithMockSpec.scala    |  5 ++--
 .../apache/pekko/kafka/internal/ConsumerMock.scala |  4 +--
 .../internal/ConsumerProgressTrackingSpec.scala    |  4 +--
 .../apache/pekko/kafka/internal/ConsumerSpec.scala |  4 +--
 .../kafka/internal/PartitionedSourceSpec.scala     | 29 +++++++++++-----------
 .../apache/pekko/kafka/internal/ProducerSpec.scala |  2 +-
 .../apache/pekko/kafka/javadsl/ControlSpec.scala   | 16 +++++++++---
 .../pekko/kafka/scaladsl/IntegrationSpec.scala     |  2 +-
 .../pekko/kafka/scaladsl/MetadataClientSpec.scala  |  6 ++---
 .../pekko/kafka/scaladsl/RebalanceExtSpec.scala    |  2 +-
 29 files changed, 121 insertions(+), 83 deletions(-)

diff --git a/.github/workflows/check-build-test.yml 
b/.github/workflows/check-build-test.yml
index affcdfcd..c63572df 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -72,7 +72,7 @@ jobs:
       - name: Cache Coursier cache
         uses: coursier/cache-action@v6
 
-      - name: Compile all code with fatal warnings for Java 11 and Scala 
2.12/2.13
+      - name: Compile all code with fatal warnings for Java 11 and Scala 
2.12/2.13/3
         # Run locally with: env CI=true sbt 'clean ; Test/compile ; It/compile'
         run: sbt "; +Test/compile; +It/compile"
 
@@ -101,8 +101,7 @@ jobs:
         uses: coursier/cache-action@v6
 
       - name: Create all API docs for artifacts/website and all reference docs
-        # Run locally with: sbt verifyDocs
-        run: sbt verifyDocs
+        run: sbt "doc ; unidoc ; docs/paradoxBrowse"
 
   test:
     name: Build and Test
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
index ce8cc1f0..1a37ff12 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
@@ -34,7 +34,7 @@ import scala.concurrent.{ Await, ExecutionContext, Future }
 object Timed extends LazyLogging {
   private val benchmarkReportBasePath = Paths.get("benchmarks", "target")
 
-  implicit val ec = ExecutionContext.fromExecutor(new ForkJoinPool)
+  implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(new 
ForkJoinPool)
 
   def reporter(metricRegistry: MetricRegistry): ScheduledReporter =
     Slf4jReporter
diff --git a/build.sbt b/build.sbt
index 0f937ce3..665409e4 100644
--- a/build.sbt
+++ b/build.sbt
@@ -45,7 +45,7 @@ lazy val testkit = project
     name := "pekko-connectors-kafka-testkit",
     AutomaticModuleName.settings("org.apache.pekko.kafka.testkit"),
     JupiterKeys.junitJupiterVersion := "5.8.2",
-    libraryDependencies ++= Dependencies.testKitDependencies,
+    libraryDependencies ++= Dependencies.testKitDependencies.value,
     libraryDependencies ++= Seq(
       "org.junit.jupiter" % "junit-jupiter-api" % 
JupiterKeys.junitJupiterVersion.value % Provided),
     mimaPreviousArtifacts := Set.empty, // temporarily disable mima checks
@@ -73,13 +73,14 @@ lazy val tests = project
   .settings(
     name := "pekko-connectors-kafka-tests",
     resolvers ++= ResolverSettings.testSpecificResolvers,
-    libraryDependencies ++= Dependencies.testDependencies,
+    libraryDependencies ++= Dependencies.testDependencies.value,
     libraryDependencies ++= Seq(
       "org.junit.vintage" % "junit-vintage-engine" % 
JupiterKeys.junitVintageVersion.value % Test,
       "net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % 
Test),
     publish / skip := true,
     Test / fork := true,
     Test / parallelExecution := false,
+    Test / compileOrder := CompileOrder.ScalaThenJava,
     IntegrationTest / parallelExecution := false)
 
 lazy val docs = project
@@ -118,4 +119,4 @@ lazy val benchmarks = project
     name := "pekko-connectors-kafka-benchmarks",
     publish / skip := true,
     IntegrationTest / parallelExecution := false,
-    libraryDependencies ++= Dependencies.benchmarkDependencies)
+    libraryDependencies ++= Dependencies.benchmarkDependencies.value)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 656c1714..ae617d5a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -140,7 +140,14 @@ object ConsumerMessage {
   /**
    * Internal Api
    */
-  @InternalApi private[kafka] final case class PartitionOffsetCommittedMarker(
+
+  private[kafka] object PartitionOffsetCommittedMarker {
+    def apply(key: GroupTopicPartition, offset: Long,
+        committedMarker: CommittedMarker, fromPartitionedSource: Boolean): 
PartitionOffsetCommittedMarker =
+      new PartitionOffsetCommittedMarker(key, offset, committedMarker, 
fromPartitionedSource)
+  }
+
+  @InternalApi private[kafka] final class PartitionOffsetCommittedMarker(
       override val key: GroupTopicPartition,
       override val offset: Long,
       private[kafka] val committedMarker: CommittedMarker,
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
index 37785610..c7c02bc8 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
@@ -206,14 +206,14 @@ object ProducerMessage {
    * Includes the original message, metadata returned from `KafkaProducer` and 
the
    * `offset` of the produced message.
    */
-  final case class Result[K, V, PassThrough] private (
+  final case class Result[K, V, PassThrough] private[kafka] (
       metadata: RecordMetadata,
       message: Message[K, V, PassThrough]) extends Results[K, V, PassThrough] {
     def offset: Long = metadata.offset()
     def passThrough: PassThrough = message.passThrough
   }
 
-  final case class MultiResultPart[K, V] private (
+  final case class MultiResultPart[K, V] private[kafka] (
       metadata: RecordMetadata,
       record: ProducerRecord[K, V])
 
@@ -221,7 +221,7 @@ object ProducerMessage {
    * [[Results]] implementation emitted when all messages in a 
[[MultiMessage]] have been
    * successfully published.
    */
-  final case class MultiResult[K, V, PassThrough] private (
+  final case class MultiResult[K, V, PassThrough] private[kafka] (
       parts: immutable.Seq[MultiResultPart[K, V]],
       passThrough: PassThrough) extends Results[K, V, PassThrough] {
 
@@ -236,7 +236,7 @@ object ProducerMessage {
    * [[Results]] implementation emitted when a [[PassThroughMessage]] has 
passed
    * through the flow.
    */
-  final case class PassThroughResult[K, V, PassThrough] private (passThrough: 
PassThrough)
+  final case class PassThroughResult[K, V, PassThrough] private[kafka] 
(passThrough: PassThrough)
       extends Results[K, V, PassThrough]
 
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
index 799eb2b2..80510139 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
@@ -15,7 +15,6 @@
 package org.apache.pekko.kafka.internal
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.pekko
 import pekko.Done
 import pekko.annotation.InternalApi
@@ -28,7 +27,7 @@ import pekko.stream.stage._
 import pekko.stream.{ Attributes, Inlet, SinkShape, Supervision }
 import org.apache.kafka.clients.producer.{ Callback, RecordMetadata }
 
-import scala.concurrent.{ Future, Promise }
+import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.util.{ Failure, Success, Try }
 
 /**
@@ -57,7 +56,8 @@ private final class CommittingProducerSinkStageLogic[K, V, IN 
<: Envelope[K, V,
     inheritedAttributes: Attributes) extends TimerGraphStageLogic(stage.shape)
     with CommitObservationLogic
     with StageIdLogging
-    with DeferredProducer[K, V] {
+    with DeferredProducer[K, V]
+    with ExecutionContextProvider {
 
   import CommitTrigger._
 
@@ -69,6 +69,8 @@ private final class CommittingProducerSinkStageLogic[K, V, IN 
<: Envelope[K, V,
   private lazy val decider: Decider =
     
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
 
+  override protected def getExecutionContext(): ExecutionContext = 
materializer.executionContext
+
   override protected def logSource: Class[_] = 
classOf[CommittingProducerSinkStage[_, _, _]]
 
   override protected val producerSettings: ProducerSettings[K, V] = 
stage.producerSettings
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
index b298a4fd..e0b3afe8 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
@@ -51,13 +51,16 @@ private class DefaultProducerStageLogic[K, V, P, IN <: 
Envelope[K, V, P], OUT <:
     inheritedAttributes: Attributes) extends TimerGraphStageLogic(stage.shape)
     with StageIdLogging
     with DeferredProducer[K, V]
-    with ProducerCompletionState {
+    with ProducerCompletionState
+    with ExecutionContextProvider {
 
   private lazy val decider: Decider =
     
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
   private var awaitingConfirmation = 0
   private var completionState: Option[Try[Done]] = None
 
+  override protected def getExecutionContext(): ExecutionContext = 
materializer.executionContext
+
   override protected def logSource: Class[_] = classOf[DefaultProducerStage[_, 
_, _, _, _]]
 
   final override val producerSettings: ProducerSettings[K, V] = stage.settings
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 3c1d084a..58084b85 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -22,6 +22,7 @@ import pekko.stream.stage._
 import pekko.util.JavaDurationConverters._
 import org.apache.kafka.clients.producer.Producer
 
+import scala.concurrent.ExecutionContext
 import scala.util.control.NonFatal
 import scala.util.{ Failure, Success }
 
@@ -43,12 +44,20 @@ private[kafka] object DeferredProducer {
   case object Assigned extends ProducerAssignmentLifecycle
 }
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kafka] trait ExecutionContextProvider {
+  protected def getExecutionContext(): ExecutionContext
+}
+
 /**
  * INTERNAL API
  */
 @InternalApi
 private[kafka] trait DeferredProducer[K, V] {
-  self: GraphStageLogic with StageIdLogging =>
+  self: GraphStageLogic with StageIdLogging with ExecutionContextProvider =>
 
   import DeferredProducer._
 
@@ -67,7 +76,7 @@ private[kafka] trait DeferredProducer[K, V] {
   }
 
   final protected def resolveProducer(settings: ProducerSettings[K, V]): Unit 
= {
-    val producerFuture = 
settings.createKafkaProducerAsync()(materializer.executionContext)
+    val producerFuture = 
settings.createKafkaProducerAsync()(getExecutionContext())
     producerFuture.value match {
       case Some(Success(p)) => assignProducer(p)
       case Some(Failure(e)) => failStage(e)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index d2d32fb1..f728f361 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -747,10 +747,6 @@ import scala.util.control.NonFatal
   private[KafkaConsumerActor] sealed trait RebalanceListener
       extends ConsumerRebalanceListener
       with NoSerializationVerificationNeeded {
-    override def onPartitionsAssigned(partitions: 
java.util.Collection[TopicPartition]): Unit
-    override def onPartitionsRevoked(partitions: 
java.util.Collection[TopicPartition]): Unit
-    override def onPartitionsLost(partitions: 
java.util.Collection[TopicPartition]): Unit
-
     def postStop(): Unit = ()
   }
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index 47b1921e..af5b7748 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -186,7 +186,7 @@ private[kafka] final class CommittableOffsetBatchImpl(
     val metadata = newOffset match {
       case offset: CommittableOffsetMetadata =>
         offset.metadata
-      case _ =>
+      case null =>
         OffsetFetchResponse.NO_METADATA
     }
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index 698f70e3..09eaa624 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -124,11 +124,9 @@ object DiscoverySupport {
   }
 
   private def checkClassOrThrow(system: ActorSystemImpl): Unit =
-    system.dynamicAccess.getClassFor("org.apache.pekko.discovery.Discovery$") 
match {
-      case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
-        throw new IllegalStateException(
-          s"Apache Pekko Discovery is being used but the `pekko-discovery` 
library is not on the classpath, it must be added explicitly. See 
https://pekko.apache.org/docs/pekko/current/discovery/index.html";)
-      case _ =>
+    if 
(!system.dynamicAccess.classIsOnClasspath("org.apache.pekko.discovery.Discovery$"))
 {
+      throw new IllegalStateException(
+        s"Apache Pekko Discovery is being used but the `pekko-discovery` 
library is not on the classpath, it must be added explicitly. See 
https://pekko.apache.org/docs/pekko/current/discovery/index.html";)
     }
 
 }
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index af97494e..e10f2da7 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -11,7 +11,7 @@ import Versions._
 import sbt._
 
 object Dependencies {
-  lazy val benchmarkDependencies = Seq(
+  lazy val benchmarkDependencies = Def.setting(Seq(
     "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
     "io.dropwizard.metrics" % "metrics-core" % "4.2.11",
     "ch.qos.logback" % "logback-classic" % "1.2.11",
@@ -19,7 +19,7 @@ object Dependencies {
     "org.testcontainers" % "kafka" % testcontainersVersion % IntegrationTest,
     "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion % IntegrationTest,
     "org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion % 
IntegrationTest,
-    "org.scalatest" %% "scalatest" % scalaTestVersion % IntegrationTest)
+    "org.scalatest" %% "scalatest" % scalaTestVersion.value % IntegrationTest))
 
   lazy val clusterShardingDependencies = Seq("org.apache.pekko" %% 
"pekko-cluster-sharding-typed" % pekkoVersion)
 
@@ -28,7 +28,7 @@ object Dependencies {
     "org.apache.pekko" %% "pekko-discovery" % pekkoVersion % Provided,
     "org.apache.kafka" % "kafka-clients" % kafkaVersion)
 
-  lazy val testDependencies: Seq[ModuleID] = Seq(
+  lazy val testDependencies = Def.setting(Seq(
     "org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
     "com.google.protobuf" % "protobuf-java" % "3.19.1", // use the same 
version as in scalapb
     ("io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion 
% Test).excludeAll(
@@ -36,7 +36,7 @@ object Dependencies {
     // See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808
     ("javax.ws.rs" % "javax.ws.rs-api" % 
"2.1.1").artifacts(Artifact("javax.ws.rs-api", "jar", "jar")),
     "org.testcontainers" % "kafka" % testcontainersVersion % Test,
-    "org.scalatest" %% "scalatest" % scalaTestVersion % Test,
+    "org.scalatest" %% "scalatest" % scalaTestVersion.value % Test,
     "io.spray" %% "spray-json" % "1.3.6" % Test,
     "com.fasterxml.jackson.core" % "jackson-databind" % "2.14.2" % Test, // 
ApacheV2
     // See 
http://hamcrest.org/JavaHamcrest/distributables#upgrading-from-hamcrest-1x
@@ -48,12 +48,12 @@ object Dependencies {
     // Schema registry uses Glassfish which uses java.util.logging
     "org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
     "org.mockito" % "mockito-core" % "4.6.1" % Test,
-    "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11" % Test)
+    "com.thesamet.scalapb" %% "scalapb-runtime" % scalaPBVersion % Test))
 
-  lazy val testKitDependencies = Seq(
+  lazy val testKitDependencies = Def.setting(Seq(
     "org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion,
     "org.testcontainers" % "kafka" % testcontainersVersion % Provided,
-    "org.scalatest" %% "scalatest" % scalaTestVersion % Provided,
-    "junit" % "junit" % "4.13.2" % Provided)
+    "org.scalatest" %% "scalatest" % scalaTestVersion.value % Provided,
+    "junit" % "junit" % "4.13.2" % Provided))
 
 }
diff --git a/project/ParadoxSettings.scala b/project/ParadoxSettings.scala
index aca572f2..cacf88c3 100644
--- a/project/ParadoxSettings.scala
+++ b/project/ParadoxSettings.scala
@@ -28,7 +28,7 @@ object ParadoxSettings {
     Compile / paradoxProperties ++= Map(
       "image.base_url" -> "images/",
       "confluent.version" -> confluentAvroSerializerVersion,
-      "scalatest.version" -> scalaTestVersion,
+      "scalatest.version" -> scalaTestVersion.value,
       "pekko.version" -> pekkoVersion,
       "extref.pekko.base_url" -> s"$pekkoDocs/pekko/$pekkoVersionForDocs/%s",
       "scaladoc.org.apache.pekko.base_url" -> 
s"$pekkoAPI/pekko/$pekkoVersionForDocs/",
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index 4afa34b4..6867ae99 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -77,7 +77,7 @@ object ProjectSettings extends AutoPlugin {
       
url("https://github.com/apache/incubator-pekko-connectors-kafka/graphs/contributors";)),
     startYear := Some(2022),
     description := "Apache Pekko Kafka Connector is a Reactive Enterprise 
Integration library for Java and Scala, based on Reactive Streams and Apache 
Pekko.",
-    crossScalaVersions := Seq(Scala212, Scala213),
+    crossScalaVersions := Seq(Scala212, Scala213, Scala3),
     scalaVersion := Scala213,
     crossVersion := CrossVersion.binary,
     javacOptions ++= Seq(
@@ -86,8 +86,10 @@ object ProjectSettings extends AutoPlugin {
     scalacOptions ++= Seq(
       "-encoding",
       "UTF-8", // yes, this is 2 args
-      
"-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w")
 ++ {
-      if (insideCI.value && !Nightly) Seq("-Werror")
+      "-Wconf:cat=feature:w",
+      "-Wconf:cat=deprecation:w",
+      "-Wconf:cat=unchecked:w") ++ {
+      if (insideCI.value && !Nightly && scalaVersion.value.startsWith("2.")) 
Seq("-Werror")
       else Seq.empty
     },
     Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
diff --git a/project/Versions.scala b/project/Versions.scala
index 45d423ea..3b349025 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -18,6 +18,7 @@ object Versions {
   // align ignore-prefixes in scripts/link-validator.conf
   val Scala213 = "2.13.10" // update even in link-validator.conf
   val Scala212 = "2.12.17"
+  val Scala3 = "3.2.2"
 
   val pekkoVersionForDocs = "current"
   val pekkoVersion = "0.0.0+26630-2c4d0ee0-SNAPSHOT"
@@ -27,7 +28,14 @@ object Versions {
   val KafkaVersionForDocs = "30"
   // This should align with the ScalaTest version used in the Apache Pekko 
1.0.x testkit
   // 
https://github.com/apache/incubator-pekko/blob/main/project/Dependencies.scala#L70
-  val scalaTestVersion = "3.1.4"
+  val scalaTestVersion = Def.setting {
+    if (scalaVersion.value.startsWith("3.")) {
+      "3.2.9"
+    } else {
+      "3.1.4"
+    }
+  }
+  val scalaPBVersion = "0.11.13"
   val testcontainersVersion = "1.16.3"
   val slf4jVersion = "1.7.36"
   // this depends on Kafka, and should be upgraded to such latest version
diff --git a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala 
b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
index c12ad5cd..b41dfa0e 100644
--- a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
@@ -28,7 +28,7 @@ import pekko.kafka.{ ConsumerRebalanceEvent, 
ConsumerSettings, Subscriptions }
 import pekko.stream.scaladsl.{ Flow, Sink }
 import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, 
StringDeserializer }
 
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
 import scala.util.{ Failure, Success }
 
@@ -39,10 +39,10 @@ import scala.util.{ Failure, Success }
  * 
https://github.com/akka/akka-samples/tree/2.6/akka-sample-kafka-to-sharding-scala
  */
 object ClusterShardingExample {
-  implicit val system = ActorSystem(Behaviors.empty, "ClusterShardingExample")
+  implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, 
"ClusterShardingExample")
   val kafkaBootstrapServers = "localhost:9092"
 
-  implicit val ec = system.executionContext
+  implicit val ec: ExecutionContext = system.executionContext
 
   def userBehaviour(): Behavior[User] = Behaviors.empty[User]
   def userBusiness[T](): Flow[T, T, NotUsed] = Flow[T]
diff --git a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala 
b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
index 1b447b26..274eda60 100644
--- a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
+++ b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
@@ -15,6 +15,7 @@
 package docs.scaladsl
 
 import org.apache.pekko
+import org.apache.pekko.kafka.ConsumerSettings
 import pekko.kafka.scaladsl.MetadataClient
 import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
 import org.scalatest.TryValues
@@ -44,7 +45,7 @@ class FetchMetadata extends DocsSpecBase with 
TestcontainersKafkaLike with TryVa
     val consumerSettings = consumerDefaults.withGroupId(createGroupId())
     val topic = createTopic()
     // #metadata
-    val timeout = 5.seconds
+    val timeout: FiniteDuration = 5.seconds
     val settings = consumerSettings.withMetadataRequestTimeout(timeout)
     implicit val askTimeout = Timeout(timeout)
 
@@ -90,7 +91,7 @@ class FetchMetadata extends DocsSpecBase with 
TestcontainersKafkaLike with TryVa
   }
 
   "Get offsets" should "timeout fast" in {
-    val consumerSettings = consumerDefaults
+    val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
       .withGroupId(createGroupId())
       .withMetadataRequestTimeout(100.millis)
     val topic = createTopic()
@@ -109,7 +110,7 @@ class FetchMetadata extends DocsSpecBase with 
TestcontainersKafkaLike with TryVa
   }
 
   it should "return" in {
-    val consumerSettings = consumerDefaults
+    val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
       .withGroupId(createGroupId())
       .withMetadataRequestTimeout(5.seconds)
     val topic = createTopic()
diff --git a/tests/src/test/scala/docs/scaladsl/proto/Order.scala 
b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
index 47d29b9a..dc37f8c7 100644
--- a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
+++ b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
@@ -19,6 +19,8 @@
 
 package docs.scaladsl.proto
 
+import com.google.protobuf.CodedInputStream
+
 @SerialVersionUID(0L)
 final case class Order(
     id: _root_.scala.Predef.String = "",
@@ -79,7 +81,7 @@ final case class Order(
 
 object Order extends 
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order] {
   implicit def messageCompanion: 
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order] = this
-  def merge(`_message__`: docs.scaladsl.proto.Order,
+  override def merge(`_message__`: docs.scaladsl.proto.Order,
       `_input__`: _root_.com.google.protobuf.CodedInputStream): 
docs.scaladsl.proto.Order = {
     var __id = `_message__`.id
     var `_unknownFields__` : _root_.scalapb.UnknownFieldSet.Builder = null
@@ -101,6 +103,8 @@ object Order extends 
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order
       id = __id,
       unknownFields = if (_unknownFields__ == null) _message__.unknownFields 
else _unknownFields__.result())
   }
+  override def parseFrom(input: CodedInputStream): docs.scaladsl.proto.Order = 
merge(defaultInstance, input)
+
   implicit def messageReads: 
_root_.scalapb.descriptors.Reads[docs.scaladsl.proto.Order] =
     _root_.scalapb.descriptors.Reads {
       case _root_.scalapb.descriptors.PMessage(__fieldsMap) =>
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index a536da34..b00801f5 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -15,7 +15,6 @@
 package org.apache.pekko.kafka.internal
 
 import java.util.concurrent.atomic.AtomicLong
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorSystem
@@ -43,6 +42,7 @@ import org.scalatest.matchers.should.Matchers
 import org.slf4j.{ Logger, LoggerFactory }
 
 import scala.collection.immutable
+import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 
 class CommittingProducerSinkSpec(_system: ActorSystem)
@@ -67,7 +67,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
   // used by the .log(...) stream operator
   implicit val adapter: LoggingAdapter = new Slf4jToAkkaLoggingAdapter(log)
 
-  implicit val ec = _system.dispatcher
+  implicit val ec: ExecutionContext = _system.dispatcher
 
   val groupId = "group1"
   val topic = "topic1"
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 9ede79de..76794d49 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -15,7 +15,6 @@
 package org.apache.pekko.kafka.internal
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorSystem
@@ -40,7 +39,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.matchers.should.Matchers
 
 import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.{ Await, ExecutionContext, Future }
 
 object CommittingWithMockSpec {
   type K = String
@@ -84,7 +83,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
   override def afterAll(): Unit =
     shutdown(system)
 
-  implicit val ec = _system.dispatcher
+  implicit val ec: ExecutionContext = _system.dispatcher
   val messages = (1 to 1000).map(createMessage)
   val failure = new CommitFailedException()
   val onCompleteFailure: ConsumerMock.OnCompleteHandler = _ => (null, failure)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index 98b99edb..127cddb0 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -177,13 +177,13 @@ class ConsumerMock[K, V](handler: 
ConsumerMock.CommitHandler = new ConsumerMock.
   def assignPartitions(tps: Set[TopicPartition]) =
     tps.groupBy(_.topic()).foreach {
       case (topic, localTps) =>
-        pendingSubscriptions.find(_._1 == 
topic).get._2.onPartitionsAssigned(localTps.asJavaCollection)
+        pendingSubscriptions.find(_._1 == 
List(topic)).get._2.onPartitionsAssigned(localTps.asJavaCollection)
     }
 
   def revokePartitions(tps: Set[TopicPartition]) =
     tps.groupBy(_.topic()).foreach {
       case (topic, localTps) =>
-        pendingSubscriptions.find(_._1 == 
topic).get._2.onPartitionsRevoked(localTps.asJavaCollection)
+        pendingSubscriptions.find(_._1 == 
List(topic)).get._2.onPartitionsRevoked(localTps.asJavaCollection)
     }
 
   def releaseAndAwaitCommitCallbacks(testkit: TestKit, minOffset: Long): Unit 
= {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index 9fbe61c4..f341f386 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -170,8 +170,8 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike 
with Matchers with Lo
 
   it should "pass through requests to listeners" in {
     val tracker = new ConsumerProgressTrackerImpl()
+    var state = Map[TopicPartition, Long]()
     val listener = new ConsumerAssignmentTrackingListener {
-      var state = Map[TopicPartition, Long]()
       override def revoke(revokedTps: Set[TopicPartition]): Unit = {
         state = state.filter { case (tp, _) => !revokedTps.contains(tp) }
       }
@@ -183,7 +183,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike 
with Matchers with Lo
     tracker.addProgressTrackingCallback(listener)
 
     def verifyOffsets(offsets: Map[TopicPartition, Long]): Unit = {
-      listener.state should be(offsets)
+      state should be(offsets)
     }
 
     // assign
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index d5f1b610..1c9bb21e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -38,7 +38,7 @@ import org.scalatest.matchers.should.Matchers
 
 import scala.collection.immutable.Seq
 import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.{ Await, ExecutionContext, Future }
 
 object ConsumerSpec {
   type K = String
@@ -80,7 +80,7 @@ class ConsumerSpec(_system: ActorSystem)
   override def afterAll(): Unit =
     shutdown(system)
 
-  implicit val ec = _system.dispatcher
+  implicit val ec: ExecutionContext = _system.dispatcher
   val messages = (1 to 1000).map(createMessage)
 
   def checkMessagesReceiving(msgss: Seq[Seq[CommittableMessage[K, V]]]): Unit 
= {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index fb70f488..8a693413 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.internal
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.{ CountDownLatch, TimeUnit }
 import java.util.function.UnaryOperator
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorSystem
@@ -40,7 +39,7 @@ import org.scalatest.matchers.should.Matchers
 import org.scalatest.{ BeforeAndAfterAll, OptionValues }
 import org.slf4j.{ Logger, LoggerFactory }
 
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
 
 class PartitionedSourceSpec(_system: ActorSystem)
@@ -66,7 +65,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
   override def afterAll(): Unit =
     shutdown(system)
 
-  implicit val ec = _system.dispatcher
+  implicit val ec: ExecutionContext = _system.dispatcher
 
   def consumerSettings(dummy: Consumer[K, V]): ConsumerSettings[K, V] =
     ConsumerSettings
@@ -264,7 +263,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -296,7 +295,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -335,7 +334,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -367,7 +366,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     val dummy = new Dummy()
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
 
@@ -407,7 +406,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
       Future {
         latch.await(10, TimeUnit.SECONDS)
-        log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+        logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
         tps.map(tp => (tp, 300L)).toMap
       }
     }
@@ -436,7 +435,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
       // This will be called twice, but we ensure that the second returned 
Future completes
       // before the first returned Future
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       val offsets = tps.map(tp => (tp, 300L)).toMap
       if (tps.size == 2) {
         Future {
@@ -475,7 +474,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -509,7 +508,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -550,7 +549,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     }
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       assertGetOffsetsOnAssign(tps)
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
@@ -584,7 +583,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     val dummy = new Dummy()
 
     def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, 
Long]] = { tps =>
-      log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+      logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
       Future.successful(tps.map(tp => (tp, 300L)).toMap)
     }
 
@@ -747,7 +746,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 }
 
 object PartitionedSourceSpec {
-  def log: Logger = LoggerFactory.getLogger(getClass)
+  def logger: Logger = LoggerFactory.getLogger(getClass)
 
   type K = String
   type V = String
@@ -759,7 +758,7 @@ object PartitionedSourceSpec {
   val tp1 = new TopicPartition(topic, 1)
 
   def sleep(time: FiniteDuration): Unit = {
-    log.debug(s"sleeping $time")
+    logger.debug(s"sleeping $time")
     Thread.sleep(time.toMillis)
   }
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 1e7633f5..158f8e53 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -64,7 +64,7 @@ class ProducerSpec(_system: ActorSystem)
 
   override def afterAll(): Unit = shutdown(system)
 
-  implicit val ec = _system.dispatcher
+  implicit val ec: ExecutionContext = _system.dispatcher
 
   private val group = "group"
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
index 961820a6..a074a833 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
@@ -32,17 +32,27 @@ import scala.concurrent.Future
 import scala.language.reflectiveCalls
 
 object ControlSpec {
+
+  trait WithShutdownCalled {
+    def shutdownCalled: AtomicBoolean
+  }
+
   def createControl(stopFuture: Future[Done] = Future.successful(Done),
-      shutdownFuture: Future[Done] = Future.successful(Done)) = {
+      shutdownFuture: Future[Done] = Future.successful(Done)): 
Consumer.Control with WithShutdownCalled = {
     val control = new pekko.kafka.scaladsl.ControlSpec.ControlImpl(stopFuture, 
shutdownFuture)
     val wrapped = new ConsumerControlAsJava(control)
-    new Consumer.Control {
-      def shutdownCalled: AtomicBoolean = control.shutdownCalled
+    new Consumer.Control with WithShutdownCalled {
+      override def shutdownCalled: AtomicBoolean = control.shutdownCalled
+
       override def stop(): CompletionStage[Done] = wrapped.stop()
+
       override def shutdown(): CompletionStage[Done] = wrapped.shutdown()
+
       override def drainAndShutdown[T](streamCompletion: CompletionStage[T], 
ec: Executor): CompletionStage[T] =
         wrapped.drainAndShutdown(streamCompletion, ec)
+
       override def isShutdown: CompletionStage[Done] = wrapped.isShutdown
+
       override def getMetrics: CompletionStage[util.Map[MetricName, Metric]] = 
wrapped.getMetrics
     }
   }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index a945e11d..1caa499e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -40,7 +40,7 @@ import scala.util.Success
 
 class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with 
Inside {
 
-  implicit val patience = PatienceConfig(30.seconds, 500.millis)
+  implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 
500.millis)
 
   "Kafka connector" must {
     "produce to plainSink and consume from plainSource" in {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
index 2e7bcfc8..0d6e306e 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
@@ -138,8 +138,8 @@ class MetadataClientSpec extends SpecBase with 
TestcontainersKafkaLike {
       val expectedPartitionsForTopic1 = (topic1, 0) :: (topic1, 1) :: Nil
       val expectedPartitionsForTopic2 = (topic2, 0) :: Nil
 
-      topics(topic1).leftSideValue.map(mapToTopicPartition) shouldBe 
expectedPartitionsForTopic1
-      topics(topic2).leftSideValue.map(mapToTopicPartition) shouldBe 
expectedPartitionsForTopic2
+      topics(topic1).map(mapToTopicPartition) shouldBe 
expectedPartitionsForTopic1
+      topics(topic2).map(mapToTopicPartition) shouldBe 
expectedPartitionsForTopic2
 
       metadataClient.close()
     }
@@ -155,7 +155,7 @@ class MetadataClientSpec extends SpecBase with 
TestcontainersKafkaLike {
 
       val partitionsInfo = metadataClient.getPartitionsFor(topic).futureValue
 
-      partitionsInfo.leftSideValue.map(_.partition()) shouldBe List(0, 1)
+      partitionsInfo.map(_.partition()) shouldBe List(0, 1)
 
       metadataClient.close()
     }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
index 97b1ce89..dbbba2fa 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
@@ -138,7 +138,7 @@ class RebalanceExtSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
     val producerTpsAck: Seq[Future[Done]] = topics.flatMap { topic1 =>
       val topicIdx = topics.indexOf(topic1)
       val topicOffset = topicIdx * partitionCount * perPartitionMessageCount
-      (0 until partitionCount).map { partitionIdx: Int =>
+      (0 until partitionCount).map { (partitionIdx: Int) =>
         val startMessageIdx = partitionIdx * perPartitionMessageCount + 1 + 
topicOffset
         val endMessageIdx = startMessageIdx + perPartitionMessageCount - 1
         val messageRange = startMessageIdx to endMessageIdx


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to