This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new c457eda9 support scala 3 compilation (#58)
c457eda9 is described below
commit c457eda9e928da4949dbea6ca4196108c952727d
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 25 21:43:19 2023 +0200
support scala 3 compilation (#58)
* upgrade scalatest
partial scala 3 support
Update check-build-test.yml
Update check-build-test.yml
Update check-build-test.yml
Update check-build-test.yml
more changes to support scala 3 compile
scalafmt
another scala 3 compile issue
another scala 3 compile issue
more scala 3 compile issues
compile issue
some scala 3 test compile issues
Update ConsumerMock.scala
more scala3 compile issues
compile issues
upgrade scalapb
Update check-build-test.yml
* try to run scala3 compile
Update check-build-test.yml
* scala 3 compile issue
* scala 3 build failing over Wconf setting
* Update ProjectSettings.scala
* Update ProjectSettings.scala
* Update ProjectSettings.scala
---
.github/workflows/check-build-test.yml | 5 ++--
.../org/apache/pekko/kafka/benchmarks/Timed.scala | 2 +-
build.sbt | 7 +++---
.../org/apache/pekko/kafka/ConsumerMessage.scala | 9 ++++++-
.../org/apache/pekko/kafka/ProducerMessage.scala | 8 +++---
.../internal/CommittingProducerSinkStage.scala | 8 +++---
.../kafka/internal/DefaultProducerStage.scala | 5 +++-
.../pekko/kafka/internal/DeferredProducer.scala | 13 ++++++++--
.../pekko/kafka/internal/KafkaConsumerActor.scala | 4 ---
.../pekko/kafka/internal/MessageBuilder.scala | 2 +-
.../pekko/kafka/scaladsl/DiscoverySupport.scala | 8 +++---
project/Dependencies.scala | 16 ++++++------
project/ParadoxSettings.scala | 2 +-
project/ProjectSettings.scala | 8 +++---
project/Versions.scala | 10 +++++++-
.../docs/scaladsl/ClusterShardingExample.scala | 6 ++---
.../test/scala/docs/scaladsl/FetchMetadata.scala | 7 +++---
.../src/test/scala/docs/scaladsl/proto/Order.scala | 6 ++++-
.../internal/CommittingProducerSinkSpec.scala | 4 +--
.../kafka/internal/CommittingWithMockSpec.scala | 5 ++--
.../apache/pekko/kafka/internal/ConsumerMock.scala | 4 +--
.../internal/ConsumerProgressTrackingSpec.scala | 4 +--
.../apache/pekko/kafka/internal/ConsumerSpec.scala | 4 +--
.../kafka/internal/PartitionedSourceSpec.scala | 29 +++++++++++-----------
.../apache/pekko/kafka/internal/ProducerSpec.scala | 2 +-
.../apache/pekko/kafka/javadsl/ControlSpec.scala | 16 +++++++++---
.../pekko/kafka/scaladsl/IntegrationSpec.scala | 2 +-
.../pekko/kafka/scaladsl/MetadataClientSpec.scala | 6 ++---
.../pekko/kafka/scaladsl/RebalanceExtSpec.scala | 2 +-
29 files changed, 121 insertions(+), 83 deletions(-)
diff --git a/.github/workflows/check-build-test.yml
b/.github/workflows/check-build-test.yml
index affcdfcd..c63572df 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -72,7 +72,7 @@ jobs:
- name: Cache Coursier cache
uses: coursier/cache-action@v6
- - name: Compile all code with fatal warnings for Java 11 and Scala
2.12/2.13
+ - name: Compile all code with fatal warnings for Java 11 and Scala
2.12/2.13/3
# Run locally with: env CI=true sbt 'clean ; Test/compile ; It/compile'
run: sbt "; +Test/compile; +It/compile"
@@ -101,8 +101,7 @@ jobs:
uses: coursier/cache-action@v6
- name: Create all API docs for artifacts/website and all reference docs
- # Run locally with: sbt verifyDocs
- run: sbt verifyDocs
+ run: sbt "doc ; unidoc ; docs/paradoxBrowse"
test:
name: Build and Test
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
index ce8cc1f0..1a37ff12 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
@@ -34,7 +34,7 @@ import scala.concurrent.{ Await, ExecutionContext, Future }
object Timed extends LazyLogging {
private val benchmarkReportBasePath = Paths.get("benchmarks", "target")
- implicit val ec = ExecutionContext.fromExecutor(new ForkJoinPool)
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(new
ForkJoinPool)
def reporter(metricRegistry: MetricRegistry): ScheduledReporter =
Slf4jReporter
diff --git a/build.sbt b/build.sbt
index 0f937ce3..665409e4 100644
--- a/build.sbt
+++ b/build.sbt
@@ -45,7 +45,7 @@ lazy val testkit = project
name := "pekko-connectors-kafka-testkit",
AutomaticModuleName.settings("org.apache.pekko.kafka.testkit"),
JupiterKeys.junitJupiterVersion := "5.8.2",
- libraryDependencies ++= Dependencies.testKitDependencies,
+ libraryDependencies ++= Dependencies.testKitDependencies.value,
libraryDependencies ++= Seq(
"org.junit.jupiter" % "junit-jupiter-api" %
JupiterKeys.junitJupiterVersion.value % Provided),
mimaPreviousArtifacts := Set.empty, // temporarily disable mima checks
@@ -73,13 +73,14 @@ lazy val tests = project
.settings(
name := "pekko-connectors-kafka-tests",
resolvers ++= ResolverSettings.testSpecificResolvers,
- libraryDependencies ++= Dependencies.testDependencies,
+ libraryDependencies ++= Dependencies.testDependencies.value,
libraryDependencies ++= Seq(
"org.junit.vintage" % "junit-vintage-engine" %
JupiterKeys.junitVintageVersion.value % Test,
"net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value %
Test),
publish / skip := true,
Test / fork := true,
Test / parallelExecution := false,
+ Test / compileOrder := CompileOrder.ScalaThenJava,
IntegrationTest / parallelExecution := false)
lazy val docs = project
@@ -118,4 +119,4 @@ lazy val benchmarks = project
name := "pekko-connectors-kafka-benchmarks",
publish / skip := true,
IntegrationTest / parallelExecution := false,
- libraryDependencies ++= Dependencies.benchmarkDependencies)
+ libraryDependencies ++= Dependencies.benchmarkDependencies.value)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 656c1714..ae617d5a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -140,7 +140,14 @@ object ConsumerMessage {
/**
* Internal Api
*/
- @InternalApi private[kafka] final case class PartitionOffsetCommittedMarker(
+
+ private[kafka] object PartitionOffsetCommittedMarker {
+ def apply(key: GroupTopicPartition, offset: Long,
+ committedMarker: CommittedMarker, fromPartitionedSource: Boolean):
PartitionOffsetCommittedMarker =
+ new PartitionOffsetCommittedMarker(key, offset, committedMarker,
fromPartitionedSource)
+ }
+
+ @InternalApi private[kafka] final class PartitionOffsetCommittedMarker(
override val key: GroupTopicPartition,
override val offset: Long,
private[kafka] val committedMarker: CommittedMarker,
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
index 37785610..c7c02bc8 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
@@ -206,14 +206,14 @@ object ProducerMessage {
* Includes the original message, metadata returned from `KafkaProducer` and
the
* `offset` of the produced message.
*/
- final case class Result[K, V, PassThrough] private (
+ final case class Result[K, V, PassThrough] private[kafka] (
metadata: RecordMetadata,
message: Message[K, V, PassThrough]) extends Results[K, V, PassThrough] {
def offset: Long = metadata.offset()
def passThrough: PassThrough = message.passThrough
}
- final case class MultiResultPart[K, V] private (
+ final case class MultiResultPart[K, V] private[kafka] (
metadata: RecordMetadata,
record: ProducerRecord[K, V])
@@ -221,7 +221,7 @@ object ProducerMessage {
* [[Results]] implementation emitted when all messages in a
[[MultiMessage]] have been
* successfully published.
*/
- final case class MultiResult[K, V, PassThrough] private (
+ final case class MultiResult[K, V, PassThrough] private[kafka] (
parts: immutable.Seq[MultiResultPart[K, V]],
passThrough: PassThrough) extends Results[K, V, PassThrough] {
@@ -236,7 +236,7 @@ object ProducerMessage {
* [[Results]] implementation emitted when a [[PassThroughMessage]] has
passed
* through the flow.
*/
- final case class PassThroughResult[K, V, PassThrough] private (passThrough:
PassThrough)
+ final case class PassThroughResult[K, V, PassThrough] private[kafka]
(passThrough: PassThrough)
extends Results[K, V, PassThrough]
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
index 799eb2b2..80510139 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala
@@ -15,7 +15,6 @@
package org.apache.pekko.kafka.internal
import java.util.concurrent.atomic.AtomicInteger
-
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
@@ -28,7 +27,7 @@ import pekko.stream.stage._
import pekko.stream.{ Attributes, Inlet, SinkShape, Supervision }
import org.apache.kafka.clients.producer.{ Callback, RecordMetadata }
-import scala.concurrent.{ Future, Promise }
+import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Success, Try }
/**
@@ -57,7 +56,8 @@ private final class CommittingProducerSinkStageLogic[K, V, IN
<: Envelope[K, V,
inheritedAttributes: Attributes) extends TimerGraphStageLogic(stage.shape)
with CommitObservationLogic
with StageIdLogging
- with DeferredProducer[K, V] {
+ with DeferredProducer[K, V]
+ with ExecutionContextProvider {
import CommitTrigger._
@@ -69,6 +69,8 @@ private final class CommittingProducerSinkStageLogic[K, V, IN
<: Envelope[K, V,
private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
+ override protected def getExecutionContext(): ExecutionContext =
materializer.executionContext
+
override protected def logSource: Class[_] =
classOf[CommittingProducerSinkStage[_, _, _]]
override protected val producerSettings: ProducerSettings[K, V] =
stage.producerSettings
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
index b298a4fd..e0b3afe8 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/DefaultProducerStage.scala
@@ -51,13 +51,16 @@ private class DefaultProducerStageLogic[K, V, P, IN <:
Envelope[K, V, P], OUT <:
inheritedAttributes: Attributes) extends TimerGraphStageLogic(stage.shape)
with StageIdLogging
with DeferredProducer[K, V]
- with ProducerCompletionState {
+ with ProducerCompletionState
+ with ExecutionContextProvider {
private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private var awaitingConfirmation = 0
private var completionState: Option[Try[Done]] = None
+ override protected def getExecutionContext(): ExecutionContext =
materializer.executionContext
+
override protected def logSource: Class[_] = classOf[DefaultProducerStage[_,
_, _, _, _]]
final override val producerSettings: ProducerSettings[K, V] = stage.settings
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 3c1d084a..58084b85 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -22,6 +22,7 @@ import pekko.stream.stage._
import pekko.util.JavaDurationConverters._
import org.apache.kafka.clients.producer.Producer
+import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
@@ -43,12 +44,20 @@ private[kafka] object DeferredProducer {
case object Assigned extends ProducerAssignmentLifecycle
}
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kafka] trait ExecutionContextProvider {
+ protected def getExecutionContext(): ExecutionContext
+}
+
/**
* INTERNAL API
*/
@InternalApi
private[kafka] trait DeferredProducer[K, V] {
- self: GraphStageLogic with StageIdLogging =>
+ self: GraphStageLogic with StageIdLogging with ExecutionContextProvider =>
import DeferredProducer._
@@ -67,7 +76,7 @@ private[kafka] trait DeferredProducer[K, V] {
}
final protected def resolveProducer(settings: ProducerSettings[K, V]): Unit
= {
- val producerFuture =
settings.createKafkaProducerAsync()(materializer.executionContext)
+ val producerFuture =
settings.createKafkaProducerAsync()(getExecutionContext())
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index d2d32fb1..f728f361 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -747,10 +747,6 @@ import scala.util.control.NonFatal
private[KafkaConsumerActor] sealed trait RebalanceListener
extends ConsumerRebalanceListener
with NoSerializationVerificationNeeded {
- override def onPartitionsAssigned(partitions:
java.util.Collection[TopicPartition]): Unit
- override def onPartitionsRevoked(partitions:
java.util.Collection[TopicPartition]): Unit
- override def onPartitionsLost(partitions:
java.util.Collection[TopicPartition]): Unit
-
def postStop(): Unit = ()
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index 47b1921e..af5b7748 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -186,7 +186,7 @@ private[kafka] final class CommittableOffsetBatchImpl(
val metadata = newOffset match {
case offset: CommittableOffsetMetadata =>
offset.metadata
- case _ =>
+ case null =>
OffsetFetchResponse.NO_METADATA
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index 698f70e3..09eaa624 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -124,11 +124,9 @@ object DiscoverySupport {
}
private def checkClassOrThrow(system: ActorSystemImpl): Unit =
- system.dynamicAccess.getClassFor("org.apache.pekko.discovery.Discovery$")
match {
- case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
- throw new IllegalStateException(
- s"Apache Pekko Discovery is being used but the `pekko-discovery`
library is not on the classpath, it must be added explicitly. See
https://pekko.apache.org/docs/pekko/current/discovery/index.html")
- case _ =>
+ if
(!system.dynamicAccess.classIsOnClasspath("org.apache.pekko.discovery.Discovery$"))
{
+ throw new IllegalStateException(
+ s"Apache Pekko Discovery is being used but the `pekko-discovery`
library is not on the classpath, it must be added explicitly. See
https://pekko.apache.org/docs/pekko/current/discovery/index.html")
}
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index af97494e..e10f2da7 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -11,7 +11,7 @@ import Versions._
import sbt._
object Dependencies {
- lazy val benchmarkDependencies = Seq(
+ lazy val benchmarkDependencies = Def.setting(Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"io.dropwizard.metrics" % "metrics-core" % "4.2.11",
"ch.qos.logback" % "logback-classic" % "1.2.11",
@@ -19,7 +19,7 @@ object Dependencies {
"org.testcontainers" % "kafka" % testcontainersVersion % IntegrationTest,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion % IntegrationTest,
"org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion %
IntegrationTest,
- "org.scalatest" %% "scalatest" % scalaTestVersion % IntegrationTest)
+ "org.scalatest" %% "scalatest" % scalaTestVersion.value % IntegrationTest))
lazy val clusterShardingDependencies = Seq("org.apache.pekko" %%
"pekko-cluster-sharding-typed" % pekkoVersion)
@@ -28,7 +28,7 @@ object Dependencies {
"org.apache.pekko" %% "pekko-discovery" % pekkoVersion % Provided,
"org.apache.kafka" % "kafka-clients" % kafkaVersion)
- lazy val testDependencies: Seq[ModuleID] = Seq(
+ lazy val testDependencies = Def.setting(Seq(
"org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
"com.google.protobuf" % "protobuf-java" % "3.19.1", // use the same
version as in scalapb
("io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion
% Test).excludeAll(
@@ -36,7 +36,7 @@ object Dependencies {
// See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808
("javax.ws.rs" % "javax.ws.rs-api" %
"2.1.1").artifacts(Artifact("javax.ws.rs-api", "jar", "jar")),
"org.testcontainers" % "kafka" % testcontainersVersion % Test,
- "org.scalatest" %% "scalatest" % scalaTestVersion % Test,
+ "org.scalatest" %% "scalatest" % scalaTestVersion.value % Test,
"io.spray" %% "spray-json" % "1.3.6" % Test,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.14.2" % Test, //
ApacheV2
// See
http://hamcrest.org/JavaHamcrest/distributables#upgrading-from-hamcrest-1x
@@ -48,12 +48,12 @@ object Dependencies {
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "4.6.1" % Test,
- "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11" % Test)
+ "com.thesamet.scalapb" %% "scalapb-runtime" % scalaPBVersion % Test))
- lazy val testKitDependencies = Seq(
+ lazy val testKitDependencies = Def.setting(Seq(
"org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion,
"org.testcontainers" % "kafka" % testcontainersVersion % Provided,
- "org.scalatest" %% "scalatest" % scalaTestVersion % Provided,
- "junit" % "junit" % "4.13.2" % Provided)
+ "org.scalatest" %% "scalatest" % scalaTestVersion.value % Provided,
+ "junit" % "junit" % "4.13.2" % Provided))
}
diff --git a/project/ParadoxSettings.scala b/project/ParadoxSettings.scala
index aca572f2..cacf88c3 100644
--- a/project/ParadoxSettings.scala
+++ b/project/ParadoxSettings.scala
@@ -28,7 +28,7 @@ object ParadoxSettings {
Compile / paradoxProperties ++= Map(
"image.base_url" -> "images/",
"confluent.version" -> confluentAvroSerializerVersion,
- "scalatest.version" -> scalaTestVersion,
+ "scalatest.version" -> scalaTestVersion.value,
"pekko.version" -> pekkoVersion,
"extref.pekko.base_url" -> s"$pekkoDocs/pekko/$pekkoVersionForDocs/%s",
"scaladoc.org.apache.pekko.base_url" ->
s"$pekkoAPI/pekko/$pekkoVersionForDocs/",
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index 4afa34b4..6867ae99 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -77,7 +77,7 @@ object ProjectSettings extends AutoPlugin {
url("https://github.com/apache/incubator-pekko-connectors-kafka/graphs/contributors")),
startYear := Some(2022),
description := "Apache Pekko Kafka Connector is a Reactive Enterprise
Integration library for Java and Scala, based on Reactive Streams and Apache
Pekko.",
- crossScalaVersions := Seq(Scala212, Scala213),
+ crossScalaVersions := Seq(Scala212, Scala213, Scala3),
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
javacOptions ++= Seq(
@@ -86,8 +86,10 @@ object ProjectSettings extends AutoPlugin {
scalacOptions ++= Seq(
"-encoding",
"UTF-8", // yes, this is 2 args
-
"-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w")
++ {
- if (insideCI.value && !Nightly) Seq("-Werror")
+ "-Wconf:cat=feature:w",
+ "-Wconf:cat=deprecation:w",
+ "-Wconf:cat=unchecked:w") ++ {
+ if (insideCI.value && !Nightly && scalaVersion.value.startsWith("2."))
Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
diff --git a/project/Versions.scala b/project/Versions.scala
index 45d423ea..3b349025 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -18,6 +18,7 @@ object Versions {
// align ignore-prefixes in scripts/link-validator.conf
val Scala213 = "2.13.10" // update even in link-validator.conf
val Scala212 = "2.12.17"
+ val Scala3 = "3.2.2"
val pekkoVersionForDocs = "current"
val pekkoVersion = "0.0.0+26630-2c4d0ee0-SNAPSHOT"
@@ -27,7 +28,14 @@ object Versions {
val KafkaVersionForDocs = "30"
// This should align with the ScalaTest version used in the Apache Pekko
1.0.x testkit
//
https://github.com/apache/incubator-pekko/blob/main/project/Dependencies.scala#L70
- val scalaTestVersion = "3.1.4"
+ val scalaTestVersion = Def.setting {
+ if (scalaVersion.value.startsWith("3.")) {
+ "3.2.9"
+ } else {
+ "3.1.4"
+ }
+ }
+ val scalaPBVersion = "0.11.13"
val testcontainersVersion = "1.16.3"
val slf4jVersion = "1.7.36"
// this depends on Kafka, and should be upgraded to such latest version
diff --git a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
index c12ad5cd..b41dfa0e 100644
--- a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
@@ -28,7 +28,7 @@ import pekko.kafka.{ ConsumerRebalanceEvent,
ConsumerSettings, Subscriptions }
import pekko.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer,
StringDeserializer }
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
@@ -39,10 +39,10 @@ import scala.util.{ Failure, Success }
*
https://github.com/akka/akka-samples/tree/2.6/akka-sample-kafka-to-sharding-scala
*/
object ClusterShardingExample {
- implicit val system = ActorSystem(Behaviors.empty, "ClusterShardingExample")
+ implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty,
"ClusterShardingExample")
val kafkaBootstrapServers = "localhost:9092"
- implicit val ec = system.executionContext
+ implicit val ec: ExecutionContext = system.executionContext
def userBehaviour(): Behavior[User] = Behaviors.empty[User]
def userBusiness[T](): Flow[T, T, NotUsed] = Flow[T]
diff --git a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
index 1b447b26..274eda60 100644
--- a/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
+++ b/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala
@@ -15,6 +15,7 @@
package docs.scaladsl
import org.apache.pekko
+import org.apache.pekko.kafka.ConsumerSettings
import pekko.kafka.scaladsl.MetadataClient
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import org.scalatest.TryValues
@@ -44,7 +45,7 @@ class FetchMetadata extends DocsSpecBase with
TestcontainersKafkaLike with TryVa
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val topic = createTopic()
// #metadata
- val timeout = 5.seconds
+ val timeout: FiniteDuration = 5.seconds
val settings = consumerSettings.withMetadataRequestTimeout(timeout)
implicit val askTimeout = Timeout(timeout)
@@ -90,7 +91,7 @@ class FetchMetadata extends DocsSpecBase with
TestcontainersKafkaLike with TryVa
}
"Get offsets" should "timeout fast" in {
- val consumerSettings = consumerDefaults
+ val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
.withGroupId(createGroupId())
.withMetadataRequestTimeout(100.millis)
val topic = createTopic()
@@ -109,7 +110,7 @@ class FetchMetadata extends DocsSpecBase with
TestcontainersKafkaLike with TryVa
}
it should "return" in {
- val consumerSettings = consumerDefaults
+ val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
.withGroupId(createGroupId())
.withMetadataRequestTimeout(5.seconds)
val topic = createTopic()
diff --git a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
index 47d29b9a..dc37f8c7 100644
--- a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
+++ b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
@@ -19,6 +19,8 @@
package docs.scaladsl.proto
+import com.google.protobuf.CodedInputStream
+
@SerialVersionUID(0L)
final case class Order(
id: _root_.scala.Predef.String = "",
@@ -79,7 +81,7 @@ final case class Order(
object Order extends
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order] {
implicit def messageCompanion:
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order] = this
- def merge(`_message__`: docs.scaladsl.proto.Order,
+ override def merge(`_message__`: docs.scaladsl.proto.Order,
`_input__`: _root_.com.google.protobuf.CodedInputStream):
docs.scaladsl.proto.Order = {
var __id = `_message__`.id
var `_unknownFields__` : _root_.scalapb.UnknownFieldSet.Builder = null
@@ -101,6 +103,8 @@ object Order extends
scalapb.GeneratedMessageCompanion[docs.scaladsl.proto.Order
id = __id,
unknownFields = if (_unknownFields__ == null) _message__.unknownFields
else _unknownFields__.result())
}
+ override def parseFrom(input: CodedInputStream): docs.scaladsl.proto.Order =
merge(defaultInstance, input)
+
implicit def messageReads:
_root_.scalapb.descriptors.Reads[docs.scaladsl.proto.Order] =
_root_.scalapb.descriptors.Reads {
case _root_.scalapb.descriptors.PMessage(__fieldsMap) =>
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index a536da34..b00801f5 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -15,7 +15,6 @@
package org.apache.pekko.kafka.internal
import java.util.concurrent.atomic.AtomicLong
-
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
@@ -43,6 +42,7 @@ import org.scalatest.matchers.should.Matchers
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.immutable
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class CommittingProducerSinkSpec(_system: ActorSystem)
@@ -67,7 +67,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
// used by the .log(...) stream operator
implicit val adapter: LoggingAdapter = new Slf4jToAkkaLoggingAdapter(log)
- implicit val ec = _system.dispatcher
+ implicit val ec: ExecutionContext = _system.dispatcher
val groupId = "group1"
val topic = "topic1"
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 9ede79de..76794d49 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -15,7 +15,6 @@
package org.apache.pekko.kafka.internal
import java.util.concurrent.atomic.AtomicInteger
-
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
@@ -40,7 +39,7 @@ import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.{ Await, ExecutionContext, Future }
object CommittingWithMockSpec {
type K = String
@@ -84,7 +83,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
override def afterAll(): Unit =
shutdown(system)
- implicit val ec = _system.dispatcher
+ implicit val ec: ExecutionContext = _system.dispatcher
val messages = (1 to 1000).map(createMessage)
val failure = new CommitFailedException()
val onCompleteFailure: ConsumerMock.OnCompleteHandler = _ => (null, failure)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index 98b99edb..127cddb0 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -177,13 +177,13 @@ class ConsumerMock[K, V](handler:
ConsumerMock.CommitHandler = new ConsumerMock.
def assignPartitions(tps: Set[TopicPartition]) =
tps.groupBy(_.topic()).foreach {
case (topic, localTps) =>
- pendingSubscriptions.find(_._1 ==
topic).get._2.onPartitionsAssigned(localTps.asJavaCollection)
+ pendingSubscriptions.find(_._1 ==
List(topic)).get._2.onPartitionsAssigned(localTps.asJavaCollection)
}
def revokePartitions(tps: Set[TopicPartition]) =
tps.groupBy(_.topic()).foreach {
case (topic, localTps) =>
- pendingSubscriptions.find(_._1 ==
topic).get._2.onPartitionsRevoked(localTps.asJavaCollection)
+ pendingSubscriptions.find(_._1 ==
List(topic)).get._2.onPartitionsRevoked(localTps.asJavaCollection)
}
def releaseAndAwaitCommitCallbacks(testkit: TestKit, minOffset: Long): Unit
= {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index 9fbe61c4..f341f386 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -170,8 +170,8 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike
with Matchers with Lo
it should "pass through requests to listeners" in {
val tracker = new ConsumerProgressTrackerImpl()
+ var state = Map[TopicPartition, Long]()
val listener = new ConsumerAssignmentTrackingListener {
- var state = Map[TopicPartition, Long]()
override def revoke(revokedTps: Set[TopicPartition]): Unit = {
state = state.filter { case (tp, _) => !revokedTps.contains(tp) }
}
@@ -183,7 +183,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike
with Matchers with Lo
tracker.addProgressTrackingCallback(listener)
def verifyOffsets(offsets: Map[TopicPartition, Long]): Unit = {
- listener.state should be(offsets)
+ state should be(offsets)
}
// assign
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index d5f1b610..1c9bb21e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -38,7 +38,7 @@ import org.scalatest.matchers.should.Matchers
import scala.collection.immutable.Seq
import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.{ Await, ExecutionContext, Future }
object ConsumerSpec {
type K = String
@@ -80,7 +80,7 @@ class ConsumerSpec(_system: ActorSystem)
override def afterAll(): Unit =
shutdown(system)
- implicit val ec = _system.dispatcher
+ implicit val ec: ExecutionContext = _system.dispatcher
val messages = (1 to 1000).map(createMessage)
def checkMessagesReceiving(msgss: Seq[Seq[CommittableMessage[K, V]]]): Unit
= {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index fb70f488..8a693413 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.internal
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import java.util.function.UnaryOperator
-
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
@@ -40,7 +39,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.{ BeforeAndAfterAll, OptionValues }
import org.slf4j.{ Logger, LoggerFactory }
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
class PartitionedSourceSpec(_system: ActorSystem)
@@ -66,7 +65,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
override def afterAll(): Unit =
shutdown(system)
- implicit val ec = _system.dispatcher
+ implicit val ec: ExecutionContext = _system.dispatcher
def consumerSettings(dummy: Consumer[K, V]): ConsumerSettings[K, V] =
ConsumerSettings
@@ -264,7 +263,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -296,7 +295,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -335,7 +334,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -367,7 +366,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val dummy = new Dummy()
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -407,7 +406,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
Future {
latch.await(10, TimeUnit.SECONDS)
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
tps.map(tp => (tp, 300L)).toMap
}
}
@@ -436,7 +435,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
// This will be called twice, but we ensure that the second returned
Future completes
// before the first returned Future
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
val offsets = tps.map(tp => (tp, 300L)).toMap
if (tps.size == 2) {
Future {
@@ -475,7 +474,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -509,7 +508,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -550,7 +549,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
assertGetOffsetsOnAssign(tps)
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -584,7 +583,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val dummy = new Dummy()
def getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition,
Long]] = { tps =>
- log.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
+ logger.debug(s"getOffsetsOnAssign (${tps.mkString(",")})")
Future.successful(tps.map(tp => (tp, 300L)).toMap)
}
@@ -747,7 +746,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
}
object PartitionedSourceSpec {
- def log: Logger = LoggerFactory.getLogger(getClass)
+ def logger: Logger = LoggerFactory.getLogger(getClass)
type K = String
type V = String
@@ -759,7 +758,7 @@ object PartitionedSourceSpec {
val tp1 = new TopicPartition(topic, 1)
def sleep(time: FiniteDuration): Unit = {
- log.debug(s"sleeping $time")
+ logger.debug(s"sleeping $time")
Thread.sleep(time.toMillis)
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 1e7633f5..158f8e53 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -64,7 +64,7 @@ class ProducerSpec(_system: ActorSystem)
override def afterAll(): Unit = shutdown(system)
- implicit val ec = _system.dispatcher
+ implicit val ec: ExecutionContext = _system.dispatcher
private val group = "group"
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
index 961820a6..a074a833 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
@@ -32,17 +32,27 @@ import scala.concurrent.Future
import scala.language.reflectiveCalls
object ControlSpec {
+
+ trait WithShutdownCalled {
+ def shutdownCalled: AtomicBoolean
+ }
+
def createControl(stopFuture: Future[Done] = Future.successful(Done),
- shutdownFuture: Future[Done] = Future.successful(Done)) = {
+ shutdownFuture: Future[Done] = Future.successful(Done)):
Consumer.Control with WithShutdownCalled = {
val control = new pekko.kafka.scaladsl.ControlSpec.ControlImpl(stopFuture,
shutdownFuture)
val wrapped = new ConsumerControlAsJava(control)
- new Consumer.Control {
- def shutdownCalled: AtomicBoolean = control.shutdownCalled
+ new Consumer.Control with WithShutdownCalled {
+ override def shutdownCalled: AtomicBoolean = control.shutdownCalled
+
override def stop(): CompletionStage[Done] = wrapped.stop()
+
override def shutdown(): CompletionStage[Done] = wrapped.shutdown()
+
override def drainAndShutdown[T](streamCompletion: CompletionStage[T],
ec: Executor): CompletionStage[T] =
wrapped.drainAndShutdown(streamCompletion, ec)
+
override def isShutdown: CompletionStage[Done] = wrapped.isShutdown
+
override def getMetrics: CompletionStage[util.Map[MetricName, Metric]] =
wrapped.getMetrics
}
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index a945e11d..1caa499e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -40,7 +40,7 @@ import scala.util.Success
class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with
Inside {
- implicit val patience = PatienceConfig(30.seconds, 500.millis)
+ implicit val patience: PatienceConfig = PatienceConfig(30.seconds,
500.millis)
"Kafka connector" must {
"produce to plainSink and consume from plainSource" in {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
index 2e7bcfc8..0d6e306e 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/MetadataClientSpec.scala
@@ -138,8 +138,8 @@ class MetadataClientSpec extends SpecBase with
TestcontainersKafkaLike {
val expectedPartitionsForTopic1 = (topic1, 0) :: (topic1, 1) :: Nil
val expectedPartitionsForTopic2 = (topic2, 0) :: Nil
- topics(topic1).leftSideValue.map(mapToTopicPartition) shouldBe
expectedPartitionsForTopic1
- topics(topic2).leftSideValue.map(mapToTopicPartition) shouldBe
expectedPartitionsForTopic2
+ topics(topic1).map(mapToTopicPartition) shouldBe
expectedPartitionsForTopic1
+ topics(topic2).map(mapToTopicPartition) shouldBe
expectedPartitionsForTopic2
metadataClient.close()
}
@@ -155,7 +155,7 @@ class MetadataClientSpec extends SpecBase with
TestcontainersKafkaLike {
val partitionsInfo = metadataClient.getPartitionsFor(topic).futureValue
- partitionsInfo.leftSideValue.map(_.partition()) shouldBe List(0, 1)
+ partitionsInfo.map(_.partition()) shouldBe List(0, 1)
metadataClient.close()
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
index 97b1ce89..dbbba2fa 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceExtSpec.scala
@@ -138,7 +138,7 @@ class RebalanceExtSpec extends SpecBase with
TestcontainersKafkaLike with Inside
val producerTpsAck: Seq[Future[Done]] = topics.flatMap { topic1 =>
val topicIdx = topics.indexOf(topic1)
val topicOffset = topicIdx * partitionCount * perPartitionMessageCount
- (0 until partitionCount).map { partitionIdx: Int =>
+ (0 until partitionCount).map { (partitionIdx: Int) =>
val startMessageIdx = partitionIdx * perPartitionMessageCount + 1 +
topicOffset
val endMessageIdx = startMessageIdx + perPartitionMessageCount - 1
val messageRange = startMessageIdx to endMessageIdx
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]