This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 416d354f use scala converters (#367)
416d354f is described below

commit 416d354fa83162467527d4b96d9c1010762c2dd4
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Sep 21 13:15:23 2025 +0100

    use scala converters (#367)
    
    * use scala converters
    
    * next batch
    
    * more
    
    * more updates
    
    * scalafmt
---
 .../pekko/kafka/benchmarks/InflightMetrics.scala   |  2 +-
 .../kafka/benchmarks/KafkaConsumerBenchmarks.scala |  2 +-
 .../kafka/benchmarks/KafkaConsumerFixtureGen.scala |  3 +-
 .../benchmarks/KafkaTransactionBenchmarks.scala    |  2 +-
 .../benchmarks/KafkaTransactionFixtureGen.scala    |  3 +-
 .../ReactiveKafkaConsumerBenchmarks.scala          |  3 +-
 .../cluster/sharding/KafkaClusterSharding.scala    | 10 ++--
 .../org/apache/pekko/kafka/CommitterSettings.scala |  5 +-
 .../pekko/kafka/ConnectionCheckerSettings.scala    | 10 ++--
 .../org/apache/pekko/kafka/ConsumerMessage.scala   |  2 +-
 .../org/apache/pekko/kafka/ConsumerSettings.scala  | 66 +++++++++++-----------
 .../scala/org/apache/pekko/kafka/Metadata.scala    |  3 +-
 .../kafka/OffsetResetProtectionSettings.scala      |  9 +--
 .../org/apache/pekko/kafka/ProducerMessage.scala   |  2 +-
 .../org/apache/pekko/kafka/ProducerSettings.scala  | 17 +++---
 .../org/apache/pekko/kafka/Subscriptions.scala     |  2 +-
 .../pekko/kafka/internal/CommittableSources.scala  |  5 +-
 .../pekko/kafka/internal/ConfigSettings.scala      |  8 +--
 .../kafka/internal/ConsumerProgressTracking.scala  |  2 +-
 .../kafka/internal/ConsumerResetProtection.scala   |  2 +-
 .../kafka/internal/ControlImplementations.scala    |  9 ++-
 .../pekko/kafka/internal/DeferredProducer.scala    |  7 +--
 .../pekko/kafka/internal/KafkaConsumerActor.scala  | 17 ++++--
 .../pekko/kafka/internal/MessageBuilder.scala      |  5 +-
 .../internal/PartitionAssignmentHelpers.scala      |  2 +-
 .../pekko/kafka/internal/SubSourceLogic.scala      |  1 -
 .../internal/TransactionalProducerStage.scala      |  2 +-
 .../kafka/internal/TransactionalSources.scala      |  1 -
 .../org/apache/pekko/kafka/javadsl/Committer.scala |  2 +-
 .../org/apache/pekko/kafka/javadsl/Consumer.scala  | 14 ++---
 .../pekko/kafka/javadsl/DiscoverySupport.scala     |  4 +-
 .../pekko/kafka/javadsl/MetadataClient.scala       | 24 ++++----
 .../org/apache/pekko/kafka/javadsl/Producer.scala  |  3 +-
 .../apache/pekko/kafka/javadsl/SendProducer.scala  |  3 +-
 .../apache/pekko/kafka/javadsl/Transactional.scala |  3 +-
 .../apache/pekko/kafka/scaladsl/Committer.scala    |  5 +-
 .../org/apache/pekko/kafka/scaladsl/Consumer.scala | 11 ++--
 .../pekko/kafka/scaladsl/DiscoverySupport.scala    |  4 +-
 .../pekko/kafka/scaladsl/MetadataClient.scala      | 13 ++---
 .../apache/pekko/kafka/scaladsl/SendProducer.scala |  4 +-
 .../org/apache/pekko/kafka/IntegrationTests.scala  |  3 +-
 .../KafkaTestkitTestcontainersSettings.scala       | 17 +++---
 .../kafka/testkit/ProducerResultFactory.scala      |  2 +-
 .../kafka/testkit/internal/KafkaTestKit.scala      |  3 +-
 .../testkit/internal/TestcontainersKafka.scala     | 11 ++--
 .../pekko/kafka/testkit/scaladsl/KafkaSpec.scala   |  2 +-
 .../scaladsl/SchemaRegistrySerializationSpec.scala |  2 +-
 .../org/apache/pekko/kafka/TransactionsOps.scala   |  1 -
 .../internal/CommittingProducerSinkSpec.scala      |  2 +-
 .../apache/pekko/kafka/internal/ConsumerMock.scala |  6 +-
 .../internal/ConsumerProgressTrackingSpec.scala    |  2 +-
 .../internal/ConsumerResetProtectionSpec.scala     |  2 +-
 .../apache/pekko/kafka/internal/ConsumerSpec.scala |  2 +-
 .../kafka/internal/PartitionedSourceSpec.scala     |  2 +-
 .../apache/pekko/kafka/internal/ProducerSpec.scala |  2 +-
 .../apache/pekko/kafka/javadsl/ControlSpec.scala   |  2 +-
 .../pekko/kafka/scaladsl/RebalanceSpec.scala       |  2 +-
 .../pekko/kafka/scaladsl/RetentionPeriodSpec.scala |  2 +-
 .../pekko/kafka/scaladsl/TimestampSpec.scala       |  2 +-
 .../pekko/kafka/tests/CapturingAppender.scala      |  2 +-
 60 files changed, 182 insertions(+), 179 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
index 69a69462..3d5e5f98 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
@@ -22,13 +22,13 @@ import pekko.actor.Cancellable
 import pekko.kafka.scaladsl.Consumer.Control
 import pekko.stream.Materializer
 import pekko.stream.scaladsl.{ Keep, Sink, Source }
-import pekko.util.ccompat.JavaConverters._
 import com.codahale.metrics.{ Histogram, MetricRegistry }
 import javax.management.remote.{ JMXConnectorFactory, JMXServiceURL }
 import javax.management.{ Attribute, MBeanServerConnection, ObjectName }
 
 import scala.concurrent.duration.{ FiniteDuration, _ }
 import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
 
 private[benchmarks] trait InflightMetrics {
   import InflightMetrics._
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
index 3246c973..2dde81a4 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
@@ -21,9 +21,9 @@ import com.codahale.metrics.Meter
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, 
OffsetCommitCallback }
 import org.apache.kafka.common.TopicPartition
-import org.apache.pekko.util.ccompat.JavaConverters._
 
 import scala.annotation.tailrec
+import scala.jdk.CollectionConverters._
 
 object KafkaConsumerBenchmarks extends LazyLogging {
   val pollTimeoutMs: Duration = Duration.ofMillis(50L)
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
index a5e7646d..45301dfc 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
@@ -16,10 +16,11 @@ package org.apache.pekko.kafka.benchmarks
 
 import org.apache.pekko
 import pekko.kafka.benchmarks.app.RunTestCommand
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
 import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, 
StringDeserializer }
 
+import scala.jdk.CollectionConverters._
+
 case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer: 
KafkaConsumer[Array[Byte], String]) {
   def close(): Unit = consumer.close()
 }
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index f94da98f..78be5908 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.kafka.benchmarks
 
 import org.apache.pekko
 import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
-import pekko.util.ccompat.JavaConverters._
 import com.codahale.metrics.Meter
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.kafka.clients.consumer._
@@ -25,6 +24,7 @@ import org.apache.kafka.common.TopicPartition
 
 import scala.annotation.tailrec
 import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
 
 object KafkaTransactionBenchmarks extends LazyLogging {
 
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
index ead493f0..52451061 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
@@ -18,7 +18,6 @@ import java.util.Locale
 
 import org.apache.pekko
 import pekko.kafka.benchmarks.app.RunTestCommand
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
 import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig }
 import org.apache.kafka.common.IsolationLevel
@@ -29,6 +28,8 @@ import org.apache.kafka.common.serialization.{
   StringSerializer
 }
 
+import scala.jdk.CollectionConverters._
+
 case class KafkaTransactionTestFixture(sourceTopic: String,
     sinkTopic: String,
     msgCount: Int,
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
index a912cbf5..afff0aa4 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.pekko
 import pekko.actor.ActorSystem
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.ConsumerMessage.CommittableMessage
 import pekko.kafka.benchmarks.InflightMetrics.{ BrokerMetricRequest, 
ConsumerMetricRequest }
 import pekko.kafka.scaladsl.Committer
@@ -172,7 +171,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging 
with InflightMetrics
     val control = fixture.source
       .mapAsync(1) { m =>
         meter.mark()
-        m.committableOffset.commitInternal().map(_ => 
m)(ExecutionContexts.parasitic)
+        m.committableOffset.commitInternal().map(_ => 
m)(ExecutionContext.parasitic)
       }
       .toMat(Sink.foreach { msg =>
         if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount - 
1)
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
index 18a22dea..7dca330b 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
@@ -31,13 +31,13 @@ import pekko.kafka.scaladsl.MetadataClient
 import pekko.kafka._
 import pekko.util.Timeout._
 import org.apache.kafka.common.utils.Utils
+import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration._
 import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.jdk.FutureConverters._
+import scala.jdk.DurationConverters._
 import scala.util.{ Failure, Success }
-import pekko.util.FutureConverters._
-import pekko.util.JavaDurationConverters._
-import org.slf4j.LoggerFactory
 
 /**
  * API MAY CHANGE
@@ -88,7 +88,7 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) 
extends Extension
   def messageExtractor[M](topic: String,
       timeout: java.time.Duration,
       settings: ConsumerSettings[_, _]): 
CompletionStage[KafkaShardingMessageExtractor[M]] =
-    getPartitionCount(topic, timeout.asScala, settings)
+    getPartitionCount(topic, timeout.toScala, settings)
       .map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
       .asJava
 
@@ -153,7 +153,7 @@ final class KafkaClusterSharding(system: 
ExtendedActorSystem) extends Extension
       timeout: java.time.Duration,
       entityIdExtractor: java.util.function.Function[M, String],
       settings: ConsumerSettings[_, _]): 
CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
-    getPartitionCount(topic, timeout.asScala, settings)
+    getPartitionCount(topic, timeout.toScala, settings)
       .map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e 
=> entityIdExtractor.apply(e)))(
         system.dispatcher)
       .asJava
diff --git a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
index f45a3268..08edde4d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
@@ -13,14 +13,15 @@
  */
 
 package org.apache.pekko.kafka
+
 import java.util.concurrent.TimeUnit
 
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
-import pekko.util.JavaDurationConverters._
 import com.typesafe.config.Config
 
 import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
 
 @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/882";)
 sealed trait CommitDelivery
@@ -184,7 +185,7 @@ class CommitterSettings private (
     copy(maxInterval = maxInterval)
 
   def withMaxInterval(maxInterval: java.time.Duration): CommitterSettings =
-    copy(maxInterval = maxInterval.asScala)
+    copy(maxInterval = maxInterval.toScala)
 
   def withParallelism(parallelism: Int): CommitterSettings =
     copy(parallelism = parallelism)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
index acd93392..a3fdc3b1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
@@ -14,12 +14,12 @@
 
 package org.apache.pekko.kafka
 
-import org.apache.pekko.util.JavaDurationConverters._
+import java.time.{ Duration => JDuration }
+
 import com.typesafe.config.Config
 
 import scala.concurrent.duration._
-
-import java.time.{ Duration => JDuration }
+import scala.jdk.DurationConverters._
 
 class ConnectionCheckerSettings private[kafka] (val enable: Boolean,
     val maxRetries: Int,
@@ -48,7 +48,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: 
Boolean,
 
   /** Java API */
   def withCheckInterval(checkInterval: JDuration): ConnectionCheckerSettings =
-    copy(checkInterval = checkInterval.asScala)
+    copy(checkInterval = checkInterval.toScala)
 
   override def toString: String =
     s"org.apache.pekko.kafka.ConnectionCheckerSettings(" +
@@ -78,7 +78,7 @@ object ConnectionCheckerSettings {
     if (enable) {
       val retries = config.getInt("max-retries")
       val factor = config.getDouble("backoff-factor")
-      val checkInterval = config.getDuration("check-interval").asScala
+      val checkInterval = config.getDuration("check-interval").toScala
       apply(retries, checkInterval, factor)
     } else Disabled
   }
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 4f9e0b1e..c6bedd92 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -195,7 +195,7 @@ object ConsumerMessage {
    * Create an offset batch out of a list of offsets.
    */
   def createCommittableOffsetBatch[T <: Committable](offsets: 
java.util.List[T]): CommittableOffsetBatch = {
-    import pekko.util.ccompat.JavaConverters._
+    import scala.jdk.CollectionConverters._
     CommittableOffsetBatch(offsets.asScala.toList)
   }
 
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index fab05f0f..99aaa4e0 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -20,16 +20,16 @@ import java.util.concurrent.{ CompletionStage, Executor }
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.kafka.internal._
-import pekko.util.JavaDurationConverters._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.OptionConverters._
-import pekko.util.FutureConverters._
 import com.typesafe.config.Config
 import org.apache.kafka.clients.consumer.{ Consumer, ConsumerConfig, 
KafkaConsumer }
 import org.apache.kafka.common.serialization.Deserializer
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
 
 object ConsumerSettings {
 
@@ -79,21 +79,21 @@ object ConsumerSettings {
       valueDeserializer != null &&
       (valueDeserializer.isDefined || 
properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
       "Value deserializer should be defined or declared in configuration")
-    val pollInterval = config.getDuration("poll-interval").asScala
-    val pollTimeout = config.getDuration("poll-timeout").asScala
-    val stopTimeout = config.getDuration("stop-timeout").asScala
-    val closeTimeout = config.getDuration("close-timeout").asScala
-    val commitTimeout = config.getDuration("commit-timeout").asScala
-    val commitTimeWarning = config.getDuration("commit-time-warning").asScala
+    val pollInterval = config.getDuration("poll-interval").toScala
+    val pollTimeout = config.getDuration("poll-timeout").toScala
+    val stopTimeout = config.getDuration("stop-timeout").toScala
+    val closeTimeout = config.getDuration("close-timeout").toScala
+    val commitTimeout = config.getDuration("commit-timeout").toScala
+    val commitTimeWarning = config.getDuration("commit-time-warning").toScala
     val commitRefreshInterval = 
ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval")
     val dispatcher = config.getString("use-dispatcher")
-    val waitClosePartition = config.getDuration("wait-close-partition").asScala
-    val positionTimeout = config.getDuration("position-timeout").asScala
-    val offsetForTimesTimeout = 
config.getDuration("offset-for-times-timeout").asScala
-    val metadataRequestTimeout = 
config.getDuration("metadata-request-timeout").asScala
-    val drainingCheckInterval = 
config.getDuration("eos-draining-check-interval").asScala
+    val waitClosePartition = config.getDuration("wait-close-partition").toScala
+    val positionTimeout = config.getDuration("position-timeout").toScala
+    val offsetForTimesTimeout = 
config.getDuration("offset-for-times-timeout").toScala
+    val metadataRequestTimeout = 
config.getDuration("metadata-request-timeout").toScala
+    val drainingCheckInterval = 
config.getDuration("eos-draining-check-interval").toScala
     val connectionCheckerSettings = 
ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
-    val partitionHandlerWarning = 
config.getDuration("partition-handler-warning").asScala
+    val partitionHandlerWarning = 
config.getDuration("partition-handler-warning").toScala
     val resetProtectionThreshold = OffsetResetProtectionSettings(
       config.getConfig(OffsetResetProtectionSettings.configPath))
 
@@ -339,7 +339,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * Set the maximum duration a poll to the Kafka broker is allowed to take.
    */
   def withPollTimeout(pollTimeout: java.time.Duration): ConsumerSettings[K, V] 
=
-    copy(pollTimeout = pollTimeout.asScala)
+    copy(pollTimeout = pollTimeout.toScala)
 
   /**
    * Set the interval from one scheduled poll to the next.
@@ -352,7 +352,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * Set the interval from one scheduled poll to the next.
    */
   def withPollInterval(pollInterval: java.time.Duration): ConsumerSettings[K, 
V] =
-    copy(pollInterval = pollInterval.asScala)
+    copy(pollInterval = pollInterval.toScala)
 
   /**
    * The stage will await outstanding offset commit requests before
@@ -369,7 +369,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * stop forcefully.
    */
   def withStopTimeout(stopTimeout: java.time.Duration): ConsumerSettings[K, V] 
=
-    copy(stopTimeout = stopTimeout.asScala)
+    copy(stopTimeout = stopTimeout.toScala)
 
   /**
    * Set duration to wait for `KafkaConsumer.close` to finish.
@@ -382,7 +382,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * Set duration to wait for `KafkaConsumer.close` to finish.
    */
   def withCloseTimeout(closeTimeout: java.time.Duration): ConsumerSettings[K, 
V] =
-    copy(closeTimeout = closeTimeout.asScala)
+    copy(closeTimeout = closeTimeout.toScala)
 
   /**
    * If offset commit requests are not completed within this timeout
@@ -397,7 +397,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * the returned Future is completed with 
[[pekko.kafka.CommitTimeoutException]].
    */
   def withCommitTimeout(commitTimeout: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(commitTimeout = commitTimeout.asScala)
+    copy(commitTimeout = commitTimeout.toScala)
 
   /**
    * If commits take longer than this time a warning is logged
@@ -410,7 +410,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * If commits take longer than this time a warning is logged
    */
   def withCommitWarning(commitTimeWarning: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(commitTimeWarning = commitTimeWarning.asScala)
+    copy(commitTimeWarning = commitTimeWarning.toScala)
 
   /**
    * Fully qualified config path which holds the dispatcher configuration
@@ -438,7 +438,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    */
   def withCommitRefreshInterval(commitRefreshInterval: java.time.Duration): 
ConsumerSettings[K, V] =
     if (commitRefreshInterval.isZero) copy(commitRefreshInterval = 
Duration.Inf)
-    else copy(commitRefreshInterval = commitRefreshInterval.asScala)
+    else copy(commitRefreshInterval = commitRefreshInterval.toScala)
 
   /**
    * Time to wait for pending requests when a partition is closed.
@@ -458,7 +458,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    * Time to wait for pending requests when a partition is closed.
    */
   def withWaitClosePartition(waitClosePartition: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(waitClosePartition = waitClosePartition.asScala)
+    copy(waitClosePartition = waitClosePartition.toScala)
 
   /** Scala API: Limits the blocking on Kafka consumer position calls. */
   def withPositionTimeout(positionTimeout: FiniteDuration): 
ConsumerSettings[K, V] =
@@ -466,7 +466,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
 
   /** Java API: Limits the blocking on Kafka consumer position calls. */
   def withPositionTimeout(positionTimeout: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(positionTimeout = positionTimeout.asScala)
+    copy(positionTimeout = positionTimeout.toScala)
 
   /** Scala API: Limits the blocking on Kafka consumer offsetForTimes calls. */
   def withOffsetForTimesTimeout(offsetForTimesTimeout: FiniteDuration): 
ConsumerSettings[K, V] =
@@ -474,7 +474,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
 
   /** Java API: Limits the blocking on Kafka consumer offsetForTimes calls. */
   def withOffsetForTimesTimeout(offsetForTimesTimeout: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(offsetForTimesTimeout = offsetForTimesTimeout.asScala)
+    copy(offsetForTimesTimeout = offsetForTimesTimeout.toScala)
 
   /** Scala API */
   def withMetadataRequestTimeout(metadataRequestTimeout: FiniteDuration): 
ConsumerSettings[K, V] =
@@ -482,7 +482,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
 
   /** Java API */
   def withMetadataRequestTimeout(metadataRequestTimeout: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(metadataRequestTimeout = metadataRequestTimeout.asScala)
+    copy(metadataRequestTimeout = metadataRequestTimeout.toScala)
 
   /** Scala API: Check interval for TransactionalProducer when finishing 
transaction before shutting down consumer */
   def withDrainingCheckInterval(drainingCheckInterval: FiniteDuration): 
ConsumerSettings[K, V] =
@@ -490,7 +490,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
 
   /** Java API: Check interval for TransactionalProducer when finishing 
transaction before shutting down consumer */
   def withDrainingCheckInterval(drainingCheckInterval: java.time.Duration): 
ConsumerSettings[K, V] =
-    copy(drainingCheckInterval = drainingCheckInterval.asScala)
+    copy(drainingCheckInterval = drainingCheckInterval.toScala)
 
   /** Scala API */
   def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration): 
ConsumerSettings[K, V] =
@@ -498,7 +498,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
 
   /** Java API */
   def withPartitionHandlerWarning(partitionHandlerWarning: 
java.time.Duration): ConsumerSettings[K, V] =
-    copy(partitionHandlerWarning = partitionHandlerWarning.asScala)
+    copy(partitionHandlerWarning = partitionHandlerWarning.toScala)
 
   /**
    * Scala API.
@@ -535,10 +535,10 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
    */
   def getProperties: java.util.Map[String, AnyRef] = 
properties.asInstanceOf[Map[String, AnyRef]].asJava
 
-  def getCloseTimeout: java.time.Duration = closeTimeout.asJava
-  def getPositionTimeout: java.time.Duration = positionTimeout.asJava
-  def getOffsetForTimesTimeout: java.time.Duration = 
offsetForTimesTimeout.asJava
-  def getMetadataRequestTimeout: java.time.Duration = 
metadataRequestTimeout.asJava
+  def getCloseTimeout: java.time.Duration = closeTimeout.toJava
+  def getPositionTimeout: java.time.Duration = positionTimeout.toJava
+  def getOffsetForTimesTimeout: java.time.Duration = 
offsetForTimesTimeout.toJava
+  def getMetadataRequestTimeout: java.time.Duration = 
metadataRequestTimeout.toJava
 
   private def copy(
       properties: Map[String, String] = properties,
diff --git a/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala 
b/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
index 3fdd5cb9..453a6d03 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
@@ -18,12 +18,11 @@ import java.util.Optional
 
 import org.apache.pekko
 import pekko.actor.NoSerializationVerificationNeeded
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, 
OffsetAndTimestamp }
 import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
 
 import scala.util.Try
+import scala.jdk.CollectionConverters._
 
 /**
  * Messages for Kafka metadata fetching via [[KafkaConsumerActor]].
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
index 1e935145..c3d0dc95 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
@@ -13,14 +13,15 @@
  */
 
 package org.apache.pekko.kafka
+
 import java.time.{ Duration => JDuration }
 
 import org.apache.pekko
 import pekko.annotation.InternalApi
-import pekko.util.JavaDurationConverters._
 import com.typesafe.config.Config
 
 import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
 
 class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable: 
Boolean,
     val offsetThreshold: Long,
@@ -60,7 +61,7 @@ class OffsetResetProtectionSettings @InternalApi 
private[kafka] (val enable: Boo
    * If the record is more than this duration earlier the last received 
record, it is considered a reset
    */
   def withTimeThreshold(timeThreshold: JDuration): 
OffsetResetProtectionSettings =
-    copy(timeThreshold = timeThreshold.asScala)
+    copy(timeThreshold = timeThreshold.toScala)
 
   override def toString: String =
     s"org.apache.pekko.kafka.OffsetResetProtectionSettings(" +
@@ -89,7 +90,7 @@ object OffsetResetProtectionSettings {
    * threshold are considered indicative of an offset reset.
    */
   def apply(offsetThreshold: Long, timeThreshold: java.time.Duration): 
OffsetResetProtectionSettings =
-    new OffsetResetProtectionSettings(true, offsetThreshold, 
timeThreshold.asScala)
+    new OffsetResetProtectionSettings(true, offsetThreshold, 
timeThreshold.toScala)
 
   /**
    * Create settings from a configuration with layout `connection-checker`.
@@ -98,7 +99,7 @@ object OffsetResetProtectionSettings {
     val enable = config.getBoolean("enable")
     if (enable) {
       val offsetThreshold = config.getLong("offset-threshold")
-      val timeThreshold = config.getDuration("time-threshold").asScala
+      val timeThreshold = config.getDuration("time-threshold").toScala
       apply(offsetThreshold, timeThreshold)
     } else Disabled
   }
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
index 53a1f3f9..0690403f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
@@ -16,10 +16,10 @@ package org.apache.pekko.kafka
 
 import org.apache.pekko
 import pekko.NotUsed
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
 
 import scala.collection.immutable
+import scala.jdk.CollectionConverters._
 
 /**
  * Classes that are used in both [[javadsl.Producer]] and
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index f8f5d855..eee0dfc9 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -25,12 +25,11 @@ import org.apache.kafka.clients.producer.{ KafkaProducer, 
Producer, ProducerConf
 import org.apache.kafka.common.serialization.Serializer
 
 import scala.concurrent.duration._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
-import pekko.util.JavaDurationConverters._
-import pekko.util.OptionConverters._
-
 import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
 
 object ProducerSettings {
 
@@ -78,11 +77,11 @@ object ProducerSettings {
       valueSerializer != null &&
       (valueSerializer.isDefined || 
properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
       "Value serializer should be defined or declared in configuration")
-    val closeTimeout = config.getDuration("close-timeout").asScala
+    val closeTimeout = config.getDuration("close-timeout").toScala
     val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
     val parallelism = config.getInt("parallelism")
     val dispatcher = config.getString("use-dispatcher")
-    val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
+    val eosCommitInterval = config.getDuration("eos-commit-interval").toScala
     new ProducerSettings[K, V](
       properties,
       keySerializer,
@@ -295,7 +294,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
    * Duration to wait for `KafkaProducer.close` to finish.
    */
   def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, 
V] =
-    copy(closeTimeout = closeTimeout.asScala)
+    copy(closeTimeout = closeTimeout.toScala)
 
   /**
    * Call `KafkaProducer.close` on the 
[[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
@@ -330,7 +329,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
    * The time interval to commit a transaction when using the 
`Transactional.sink` or `Transactional.flow`.
    */
   def withEosCommitInterval(eosCommitInterval: java.time.Duration): 
ProducerSettings[K, V] =
-    copy(eosCommitInterval = eosCommitInterval.asScala)
+    copy(eosCommitInterval = eosCommitInterval.toScala)
 
   /**
    * Scala API.
diff --git a/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala 
b/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
index 15ce108a..07a913f1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
@@ -19,10 +19,10 @@ import pekko.actor.ActorRef
 import pekko.annotation.{ ApiMayChange, InternalApi }
 import pekko.kafka.internal.PartitionAssignmentHelpers
 import 
pekko.kafka.internal.PartitionAssignmentHelpers.EmptyPartitionAssignmentHandler
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
 import scala.annotation.varargs
+import scala.jdk.CollectionConverters._
 
 sealed trait Subscription {
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
index 6d2a88cf..f5934482 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.internal
 import org.apache.pekko
 import pekko.actor.ActorRef
 import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
 import pekko.kafka._
 import pekko.kafka.internal.KafkaConsumerActor.Internal.{ Commit, 
CommitSingle, CommitWithoutReply }
@@ -153,7 +152,7 @@ private[kafka] object KafkaAsyncConsumerCommitterRef {
     }
     getFirstExecutionContext(batch)
       .map { implicit ec =>
-        Future.sequence(futures).map(_ => Done)(ExecutionContexts.parasitic)
+        Future.sequence(futures).map(_ => Done)(ExecutionContext.parasitic)
       }
       .getOrElse(Future.successful(Done))
   }
@@ -211,7 +210,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private 
val consumerActor: A
     import pekko.pattern.ask
     consumerActor
       .ask(msg)(Timeout(commitTimeout))
-      .map(_ => Done)(ExecutionContexts.parasitic)
+      .map(_ => Done)(ExecutionContext.parasitic)
       .recoverWith {
         case e: AskTimeoutException =>
           Future.failed(new CommitTimeoutException(s"Kafka commit took longer 
than: $commitTimeout (${e.getMessage})"))
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
index 6f2f8efc..70ecca8a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
@@ -22,8 +22,8 @@ import com.typesafe.config.{ Config, ConfigObject }
 
 import scala.annotation.tailrec
 import scala.concurrent.duration.Duration
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.JavaDurationConverters._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
 
 /**
  * INTERNAL API
@@ -52,7 +52,7 @@ import pekko.util.JavaDurationConverters._
   }
 
   import org.apache.kafka
-  import org.apache.pekko.util.ccompat.JavaConverters._
+  import scala.jdk.CollectionConverters._
 
   def serializeAndMaskKafkaProperties[A <: kafka.common.config.AbstractConfig](
       properties: Map[String, AnyRef], constructor: java.util.Map[String, 
AnyRef] => A): String = {
@@ -69,7 +69,7 @@ import pekko.util.JavaDurationConverters._
 
   def getPotentiallyInfiniteDuration(underlying: Config, path: String): 
Duration = underlying.getString(path) match {
     case "infinite" => Duration.Inf
-    case _          => underlying.getDuration(path).asScala
+    case _          => underlying.getDuration(path).toScala
   }
 
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
index 4e693899..7ca6a03e 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
@@ -16,7 +16,7 @@ package org.apache.pekko.kafka.internal
 
 import org.apache.pekko
 import pekko.annotation.InternalApi
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.consumer.{ Consumer, ConsumerRecords, 
OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 504053db..55006009 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -22,7 +22,7 @@ import pekko.annotation.InternalApi
 import pekko.event.LoggingAdapter
 import pekko.kafka.OffsetResetProtectionSettings
 import pekko.kafka.internal.KafkaConsumerActor.Internal.Seek
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerRecord, ConsumerRecords, 
OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
index 624d3cff..2b192559 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
@@ -19,17 +19,16 @@ import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorRef
 import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.internal.KafkaConsumerActor.Internal.{ ConsumerMetrics, 
RequestMetrics }
 import pekko.kafka.{ javadsl, scaladsl }
 import pekko.stream.SourceShape
 import pekko.stream.stage.GraphStageLogic
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
 import pekko.util.Timeout
 import org.apache.kafka.common.{ Metric, MetricName }
 
 import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
 
 private object PromiseControl {
   sealed trait ControlOperation
@@ -96,7 +95,7 @@ private trait MetricsControl extends 
scaladsl.Consumer.Control {
         consumer
           .ask(RequestMetrics)(Timeout(1.minute))
           .mapTo[ConsumerMetrics]
-          .map(_.metrics)(ExecutionContexts.parasitic)
+          .map(_.metrics)(ExecutionContext.parasitic)
       }(executionContext)
   }
 }
@@ -115,7 +114,7 @@ final private[kafka] class 
ConsumerControlAsJava(underlying: scaladsl.Consumer.C
   override def isShutdown: CompletionStage[Done] = underlying.isShutdown.asJava
 
   override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] =
-    underlying.metrics.map(_.asJava)(ExecutionContexts.parasitic).asJava
+    underlying.metrics.map(_.asJava)(ExecutionContext.parasitic).asJava
 }
 
 /** Internal API */
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 944f39e0..746690f8 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -16,13 +16,12 @@ package org.apache.pekko.kafka.internal
 
 import org.apache.pekko
 import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.ProducerSettings
 import pekko.stream.stage._
-import pekko.util.JavaDurationConverters._
 import org.apache.kafka.clients.producer.Producer
 
 import scala.concurrent.ExecutionContext
+import scala.jdk.DurationConverters._
 import scala.util.control.NonFatal
 import scala.util.{ Failure, Success }
 
@@ -89,7 +88,7 @@ private[kafka] trait DeferredProducer[K, V] {
               log.error(e, "producer creation failed")
               closeAndFailStageCb.invoke(e)
               e
-            })(ExecutionContexts.parasitic)
+            })(ExecutionContext.parasitic)
         changeProducerAssignmentLifecycle(AsyncCreateRequestSent)
     }
   }
@@ -112,7 +111,7 @@ private[kafka] trait DeferredProducer[K, V] {
       try {
         // we do not have to check if producer was already closed in 
send-callback as `flush()` and `close()` are effectively no-ops in this case
         producer.flush()
-        producer.close(producerSettings.closeTimeout.asJava)
+        producer.close(producerSettings.closeTimeout.toJava)
         log.debug("Producer closed")
       } catch {
         case NonFatal(ex) => log.error(ex, "Problem occurred during producer 
close")
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index 87a56426..df6e6b61 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -31,13 +31,10 @@ import pekko.actor.{
   Timers
 }
 import pekko.annotation.InternalApi
-import pekko.util.JavaDurationConverters._
 import pekko.event.LoggingReceive
 import pekko.kafka.KafkaConsumerActor.{ StopLike, StoppingException }
 import pekko.kafka._
 import pekko.kafka.scaladsl.PartitionAssignmentHandler
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.errors.RebalanceInProgressException
 import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
@@ -45,6 +42,8 @@ import org.apache.kafka.common.{ Metric, MetricName, 
TopicPartition }
 import scala.annotation.nowarn
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
 import scala.util.{ Success, Try }
 import scala.util.control.NonFatal
 
@@ -420,7 +419,7 @@ import scala.util.control.NonFatal
     this.settings = updatedSettings
     if (settings.connectionCheckerSettings.enable)
       
context.actorOf(ConnectionChecker.props(settings.connectionCheckerSettings))
-    pollTimeout = settings.pollTimeout.asJava
+    pollTimeout = settings.pollTimeout.toJava
     offsetForTimesTimeout = settings.getOffsetForTimesTimeout
     positionTimeout = settings.getPositionTimeout
     val progressTrackingFactory: () => ConsumerProgressTracking = () => 
ensureProgressTracker()
@@ -763,7 +762,8 @@ import scala.util.control.NonFatal
   private[KafkaConsumerActor] final class RebalanceListenerImpl(
       partitionAssignmentHandler: PartitionAssignmentHandler) extends 
RebalanceListener {
 
-    private val restrictedConsumer = new RestrictedConsumer(consumer, 
settings.partitionHandlerWarning.*(0.95d).asJava)
+    private val restrictedConsumer =
+      new RestrictedConsumer(consumer, 
toJavaDuration(settings.partitionHandlerWarning.*(0.95d)))
     private val warningDuration = settings.partitionHandlerWarning.toNanos
 
     override def onPartitionsAssigned(partitions: 
java.util.Collection[TopicPartition]): Unit = {
@@ -807,6 +807,13 @@ import scala.util.control.NonFatal
           duration / 1000000L)
       }
     }
+
+    // scala.jdk.DurationConverters only works with FiniteDuration
+    // the value here should always be finite
+    private def toJavaDuration(d: Duration): java.time.Duration = d match {
+      case fd: FiniteDuration => fd.toJava
+      case _                  => throw new IllegalArgumentException(s"Expected 
FiniteDuration, got $d")
+    }
   }
 
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index e83aa9b4..9c060377 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -27,14 +27,13 @@ import pekko.kafka.ConsumerMessage.{
   TransactionalMessage,
   _
 }
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.OffsetFetchResponse
 
 import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
 
 /** Internal API */
 @InternalApi
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
index ccd2569e..f4aafd8a 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
@@ -21,7 +21,7 @@ import pekko.kafka.scaladsl.PartitionAssignmentHandler
 import pekko.kafka.javadsl
 import pekko.kafka.{ AutoSubscription, RestrictedConsumer, 
TopicPartitionsAssigned, TopicPartitionsRevoked }
 import pekko.stream.stage.AsyncCallback
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
 import org.apache.kafka.common.TopicPartition
 
 /**
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index dff06e84..4aada7fa 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -29,7 +29,6 @@ import pekko.stream.scaladsl.Source
 import pekko.stream.stage.GraphStageLogic.StageActor
 import pekko.stream.stage._
 import pekko.stream.{ Attributes, Outlet, SourceShape }
-import pekko.util.ccompat._
 import pekko.util.Timeout
 import org.apache.kafka.common.TopicPartition
 
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 6050cd18..31d8464e 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -24,13 +24,13 @@ import 
pekko.kafka.internal.ProducerStage.ProducerCompletionState
 import pekko.kafka.{ ConsumerMessage, ProducerSettings }
 import pekko.stream.stage._
 import pekko.stream.{ Attributes, FlowShape }
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, 
OffsetAndMetadata }
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 /**
  * INTERNAL API
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
index df251cca..718fc8f7 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
@@ -31,7 +31,6 @@ import pekko.kafka.{ AutoSubscription, ConsumerFailed, 
ConsumerSettings, Restric
 import pekko.stream.SourceShape
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ AsyncCallback, GraphStageLogic }
-import pekko.util.ccompat._
 import pekko.util.Timeout
 import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
OffsetAndMetadata }
 import org.apache.kafka.common.{ IsolationLevel, TopicPartition }
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
index a68dc9b2..d34b4022 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
@@ -23,7 +23,7 @@ import pekko.{ Done, NotUsed }
 import pekko.kafka.ConsumerMessage.{ Committable, CommittableOffsetBatch }
 import pekko.kafka.{ scaladsl, CommitterSettings }
 import pekko.stream.javadsl.{ Flow, FlowWithContext, Sink }
-import pekko.util.FutureConverters._
+import scala.jdk.FutureConverters._
 
 object Committer {
 
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
index 65553714..e7359a3f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
@@ -19,19 +19,19 @@ import java.util.concurrent.{ CompletionStage, Executor }
 import org.apache.pekko
 import pekko.actor.ActorRef
 import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
 import pekko.japi.Pair
 import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
 import pekko.kafka._
 import pekko.kafka.internal.{ ConsumerControlAsJava, SourceWithOffsetContext }
 import pekko.stream.javadsl.{ Source, SourceWithContext }
 import pekko.{ Done, NotUsed }
-import pekko.util.FutureConverters._
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
 
+import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
 
 /**
  * Apache Pekko Stream connector for subscribing to Kafka topics.
@@ -287,7 +287,7 @@ object Consumer {
         settings,
         subscription,
         (tps: Set[TopicPartition]) =>
-          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
         _ => ())
       .map {
         case (tp, source) => Pair(tp, source.asJava)
@@ -317,7 +317,7 @@ object Consumer {
         settings,
         subscription,
         (tps: Set[TopicPartition]) =>
-          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
         (tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava))
       .map {
         case (tp, source) => Pair(tp, source.asJava)
@@ -355,7 +355,7 @@ object Consumer {
         settings,
         subscription,
         (tps: Set[TopicPartition]) =>
-          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
         _ => ())
       .map {
         case (tp, source) => Pair(tp, source.asJava)
@@ -379,7 +379,7 @@ object Consumer {
         settings,
         subscription,
         (tps: Set[TopicPartition]) =>
-          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+          
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
         (tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava))
       .map {
         case (tp, source) => Pair(tp, source.asJava)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
index 7d6c422f..2b412b80 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
@@ -19,11 +19,11 @@ import java.util.concurrent.CompletionStage
 import org.apache.pekko
 import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
 import pekko.kafka.{ scaladsl, ConsumerSettings, ProducerSettings }
-import pekko.util.FunctionConverters._
-import pekko.util.FutureConverters._
 import com.typesafe.config.Config
 
 import scala.concurrent.Future
+import scala.jdk.FunctionConverters._
+import scala.jdk.FutureConverters._
 
 /**
  * Scala API.
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
index c474e03c..14f7bda7 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
@@ -18,16 +18,14 @@ import java.util.concurrent.{ CompletionStage, Executor }
 
 import org.apache.pekko
 import pekko.actor.{ ActorRef, ActorSystem }
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.ConsumerSettings
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
 import pekko.util.Timeout
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
 
 import scala.concurrent.ExecutionContext
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
 
 class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClient) {
 
@@ -37,13 +35,13 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
       .getBeginningOffsets(partitions.asScala.toSet)
       .map { beginningOffsets =>
         beginningOffsets.view.mapValues(Long.box).toMap.asJava
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
       .asJava
 
   def getBeginningOffsetForPartition[K, V](partition: TopicPartition): 
CompletionStage[java.lang.Long] =
     metadataClient
       .getBeginningOffsetForPartition(partition)
-      .map(Long.box)(ExecutionContexts.parasitic)
+      .map(Long.box)(ExecutionContext.parasitic)
       .asJava
 
   def getEndOffsets(
@@ -52,13 +50,13 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
       .getEndOffsets(partitions.asScala.toSet)
       .map { endOffsets =>
         endOffsets.view.mapValues(Long.box).toMap.asJava
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
       .asJava
 
   def getEndOffsetForPartition(partition: TopicPartition): 
CompletionStage[java.lang.Long] =
     metadataClient
       .getEndOffsetForPartition(partition)
-      .map(Long.box)(ExecutionContexts.parasitic)
+      .map(Long.box)(ExecutionContext.parasitic)
       .asJava
 
   def listTopics(): CompletionStage[java.util.Map[java.lang.String, 
java.util.List[PartitionInfo]]] =
@@ -66,7 +64,7 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
       .listTopics()
       .map { topics =>
         topics.view.mapValues(partitionsInfo => 
partitionsInfo.asJava).toMap.asJava
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
       .asJava
 
   def getPartitionsFor(topic: java.lang.String): 
CompletionStage[java.util.List[PartitionInfo]] =
@@ -74,7 +72,7 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
       .getPartitionsFor(topic)
       .map { partitionsInfo =>
         partitionsInfo.asJava
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
       .asJava
 
   @deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
@@ -89,7 +87,7 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
       .getCommittedOffsets(partitions.asScala.toSet)
       .map { committedOffsets =>
         committedOffsets.asJava
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
       .asJava
 
   def close(): Unit =
@@ -99,7 +97,7 @@ class MetadataClient private (metadataClient: 
pekko.kafka.scaladsl.MetadataClien
 object MetadataClient {
 
   def create(consumerActor: ActorRef, timeout: Timeout, executor: Executor): 
MetadataClient = {
-    implicit val ec: ExecutionContext = 
ExecutionContexts.fromExecutor(executor)
+    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
     val metadataClient = 
pekko.kafka.scaladsl.MetadataClient.create(consumerActor, timeout)
     new MetadataClient(metadataClient)
   }
@@ -109,7 +107,7 @@ object MetadataClient {
       system: ActorSystem,
       executor: Executor): MetadataClient = {
     val metadataClient = pekko.kafka.scaladsl.MetadataClient
-      .create(consumerSettings, timeout)(system, 
ExecutionContexts.fromExecutor(executor))
+      .create(consumerSettings, timeout)(system, 
ExecutionContext.fromExecutor(executor))
     new MetadataClient(metadataClient)
   }
 }
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
index f551a122..5fcdfdbd 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
@@ -15,6 +15,7 @@
 package org.apache.pekko.kafka.javadsl
 
 import java.util.concurrent.CompletionStage
+
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
 import pekko.kafka.ConsumerMessage.Committable
@@ -22,10 +23,10 @@ import pekko.kafka.ProducerMessage._
 import pekko.kafka.{ scaladsl, CommitterSettings, ConsumerMessage, 
ProducerSettings }
 import pekko.stream.javadsl.{ Flow, FlowWithContext, Sink }
 import pekko.{ japi, Done, NotUsed }
-import pekko.util.FutureConverters._
 import org.apache.kafka.clients.producer.ProducerRecord
 
 import scala.annotation.nowarn
+import scala.jdk.FutureConverters._
 
 /**
  * Apache Pekko Stream connector for publishing messages to Kafka topics.
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
index b266af7c..efd992be 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
@@ -21,9 +21,10 @@ import pekko.Done
 import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
 import pekko.kafka.ProducerMessage._
 import pekko.kafka.{ scaladsl, ProducerSettings }
-import pekko.util.FutureConverters._
 import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
 
+import scala.jdk.FutureConverters._
+
 /**
  * Utility class for producing to Kafka without using Apache Pekko Streams.
  */
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala 
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
index 1ef5afd9..f0445143 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
@@ -26,9 +26,10 @@ import pekko.kafka.internal.{ ConsumerControlAsJava, 
TransactionalSourceWithOffs
 import pekko.kafka.javadsl.Consumer.Control
 import pekko.stream.javadsl._
 import pekko.{ Done, NotUsed }
-import pekko.util.FutureConverters._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 
+import scala.jdk.FutureConverters._
+
 /**
  *  Apache Pekko Stream connector to support transactions between Kafka topics.
  */
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
index 2fc07110..43edd5b7 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
@@ -16,14 +16,13 @@ package org.apache.pekko.kafka.scaladsl
 
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.CommitterSettings
 import pekko.kafka.ConsumerMessage.{ Committable, CommittableOffsetBatch }
 import pekko.kafka.internal.CommitCollectorStage
 import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink }
 import pekko.{ Done, NotUsed }
 
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
 
 object Committer {
 
@@ -47,7 +46,7 @@ object Committer {
       case WaitForAck =>
         offsetBatches
           .mapAsyncUnordered(settings.parallelism) { batch =>
-            batch.commitInternal().map(_ => batch)(ExecutionContexts.parasitic)
+            batch.commitInternal().map(_ => batch)(ExecutionContext.parasitic)
           }
       case SendAndForget =>
         offsetBatches.map(_.tellCommit())
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
index e98caf1e..95e5482e 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.scaladsl
 import org.apache.pekko
 import pekko.actor.ActorRef
 import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
 import pekko.kafka._
 import pekko.kafka.internal._
@@ -114,8 +113,8 @@ object Consumer {
     override def shutdown(): Future[Done] =
       control
         .shutdown()
-        .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
-        .map(_ => Done)(ExecutionContexts.parasitic)
+        .flatMap(_ => streamCompletion)(ExecutionContext.parasitic)
+        .map(_ => Done)(ExecutionContext.parasitic)
 
     override def drainAndShutdown[S](streamCompletion: Future[S])(implicit ec: 
ExecutionContext): Future[S] =
       control.drainAndShutdown(streamCompletion)
@@ -129,8 +128,8 @@ object Consumer {
 
     override def isShutdown: Future[Done] =
       control.isShutdown
-        .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
-        .map(_ => Done)(ExecutionContexts.parasitic)
+        .flatMap(_ => streamCompletion)(ExecutionContext.parasitic)
+        .map(_ => Done)(ExecutionContext.parasitic)
 
     override def metrics: Future[Map[MetricName, Metric]] = control.metrics
   }
@@ -265,7 +264,7 @@ object Consumer {
   def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
       subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
     committableSource[K, V](settings, subscription).mapAsync(1) { m =>
-      m.committableOffset.commitInternal().map(_ => 
m.record)(ExecutionContexts.parasitic)
+      m.committableOffset.commitInternal().map(_ => 
m.record)(ExecutionContext.parasitic)
     }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index d39b5926..0d6647bf 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -19,11 +19,11 @@ import pekko.actor.{ ActorSystem, ActorSystemImpl, 
ClassicActorSystemProvider }
 import pekko.annotation.InternalApi
 import pekko.discovery.{ Discovery, ServiceDiscovery }
 import pekko.kafka.{ ConsumerSettings, ProducerSettings }
-import pekko.util.JavaDurationConverters._
 import com.typesafe.config.Config
 
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
+import scala.jdk.DurationConverters._
 import scala.util.Failure
 
 /**
@@ -72,7 +72,7 @@ object DiscoverySupport {
     checkClassOrThrow(system.asInstanceOf[ActorSystemImpl])
     val serviceName = config.getString("service-name")
     if (serviceName.nonEmpty) {
-      val lookupTimeout = config.getDuration("resolve-timeout").asScala
+      val lookupTimeout = config.getDuration("resolve-timeout").toScala
       bootstrapServers(discovery(config, system), serviceName, lookupTimeout)
     } else throw new IllegalArgumentException(s"value for `service-name` in 
$config is empty")
   }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
index 5796d4a5..62e5aecf 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.pekko
 import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem }
-import pekko.dispatch.ExecutionContexts
 import pekko.kafka.Metadata._
 import pekko.kafka.{ ConsumerSettings, KafkaConsumerActor }
 import pekko.pattern.ask
@@ -39,7 +38,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   def getBeginningOffsetForPartition(partition: TopicPartition): Future[Long] =
     getBeginningOffsets(Set(partition))
@@ -52,7 +51,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   def getEndOffsetForPartition(partition: TopicPartition): Future[Long] =
     getEndOffsets(Set(partition))
@@ -65,7 +64,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   def getPartitionsFor(topic: String): Future[List[PartitionInfo]] =
     (consumerActor ? GetPartitionsFor(topic))(timeout)
@@ -74,7 +73,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   @deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
   def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] 
=
@@ -84,7 +83,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   def getCommittedOffsets(partitions: Set[TopicPartition]): 
Future[Map[TopicPartition, OffsetAndMetadata]] =
     (consumerActor ? GetCommittedOffsets(partitions))(timeout)
@@ -93,7 +92,7 @@ class MetadataClient private (consumerActor: ActorRef, 
timeout: Timeout, managed
       .flatMap {
         case Success(res) => Future.successful(res)
         case Failure(e)   => Future.failed(e)
-      }(ExecutionContexts.parasitic)
+      }(ExecutionContext.parasitic)
 
   def close(): Unit =
     if (managedActor) {
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
index d1e6d867..97fd4f6c 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
@@ -19,10 +19,10 @@ import pekko.Done
 import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
 import pekko.kafka.ProducerMessage._
 import pekko.kafka.ProducerSettings
-import pekko.util.JavaDurationConverters._
 import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, 
RecordMetadata }
 
 import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.jdk.DurationConverters._
 
 /**
  * Utility class for producing to Kafka without using Apache Pekko Streams.
@@ -96,7 +96,7 @@ final class SendProducer[K, V] private (val settings: 
ProducerSettings[K, V], sy
     if (settings.closeProducerOnStop) producerFuture.map { producer =>
       // we do not have to check if producer was already closed in 
send-callback as `flush()` and `close()` are effectively no-ops in this case
       producer.flush()
-      producer.close(settings.closeTimeout.asJava)
+      producer.close(settings.closeTimeout.toJava)
       Done
     }
     else Future.successful(Done)
diff --git 
a/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala 
b/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
index c1b79607..17ede755 100644
--- a/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
+++ b/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
@@ -17,11 +17,12 @@ package org.apache.pekko.kafka
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream.scaladsl.Flow
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 import org.slf4j.Logger
 import org.testcontainers.containers.GenericContainer
 
+import scala.jdk.CollectionConverters._
+
 object IntegrationTests {
   val MessageLogInterval = 500L
 
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
index db802d37..94cc019b 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
@@ -16,14 +16,15 @@ package org.apache.pekko.kafka.testkit
 
 import java.time.Duration
 import java.util.function.Consumer
+
 import org.apache.pekko
 import pekko.actor.ActorSystem
-import pekko.util.JavaDurationConverters._
-import com.typesafe.config.Config
 import pekko.kafka.testkit.internal.PekkoConnectorsKafkaContainer
+import com.typesafe.config.Config
 import org.testcontainers.containers.GenericContainer
 
 import scala.concurrent.duration.FiniteDuration
+import scala.jdk.DurationConverters._
 
 final class KafkaTestkitTestcontainersSettings private (
     val zooKeeperImage: String,
@@ -103,12 +104,12 @@ final class KafkaTestkitTestcontainersSettings private (
   /**
    * Java Api
    */
-  def getClusterStartTimeout(): Duration = clusterStartTimeout.asJava
+  def getClusterStartTimeout(): Duration = clusterStartTimeout.toJava
 
   /**
    * Java Api
    */
-  def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.asJava
+  def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.toJava
 
   /**
    * Sets the ZooKeeper image
@@ -222,7 +223,7 @@ final class KafkaTestkitTestcontainersSettings private (
    * Kafka cluster start up timeout
    */
   def withClusterStartTimeout(timeout: Duration): 
KafkaTestkitTestcontainersSettings =
-    copy(clusterStartTimeout = timeout.asScala)
+    copy(clusterStartTimeout = timeout.toScala)
 
   /**
    * Kafka cluster readiness check timeout
@@ -236,7 +237,7 @@ final class KafkaTestkitTestcontainersSettings private (
    * Kafka cluster readiness check timeout
    */
   def withReadinessCheckTimeout(timeout: Duration): 
KafkaTestkitTestcontainersSettings =
-    copy(readinessCheckTimeout = timeout.asScala)
+    copy(readinessCheckTimeout = timeout.toScala)
 
   private def copy(
       zooKeeperImage: String = zooKeeperImage,
@@ -322,8 +323,8 @@ object KafkaTestkitTestcontainersSettings {
     val internalTopicsReplicationFactor = 
config.getInt("internal-topics-replication-factor")
     val useSchemaRegistry = config.getBoolean("use-schema-registry")
     val containerLogging = config.getBoolean("container-logging")
-    val clusterStartTimeout = 
config.getDuration("cluster-start-timeout").asScala
-    val readinessCheckTimeout = 
config.getDuration("readiness-check-timeout").asScala
+    val clusterStartTimeout = 
config.getDuration("cluster-start-timeout").toScala
+    val readinessCheckTimeout = 
config.getDuration("readiness-check-timeout").toScala
 
     new KafkaTestkitTestcontainersSettings(zooKeeperImage,
       zooKeeperImageTag,
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
index 7dab2a5b..66571808 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
@@ -17,11 +17,11 @@ package org.apache.pekko.kafka.testkit
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
 import pekko.kafka.ProducerMessage
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.immutable
+import scala.jdk.CollectionConverters._
 
 /**
  * Factory methods to create instances that normally are emitted by 
[[pekko.kafka.scaladsl.Producer]] and [[pekko.kafka.javadsl.Producer]] flows.
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
index 627dd273..6c0bc5b7 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
@@ -23,12 +23,13 @@ import org.apache.pekko
 import pekko.actor.ActorSystem
 import pekko.kafka.testkit.KafkaTestkitSettings
 import pekko.kafka.{ CommitterSettings, ConsumerSettings, ProducerSettings }
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.admin.{ Admin, AdminClientConfig, NewTopic }
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.{ Deserializer, Serializer, 
StringDeserializer, StringSerializer }
 import org.slf4j.Logger
 
+import scala.jdk.CollectionConverters._
+
 /**
  * Common functions for scaladsl and javadsl Testkit.
  *
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
index 0f5c77c6..a32e8b77 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
@@ -17,12 +17,13 @@ package org.apache.pekko.kafka.testkit.internal
 import org.apache.pekko
 import pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
 import pekko.kafka.testkit.scaladsl.KafkaSpec
-import pekko.util.JavaDurationConverters._
-import pekko.util.OptionConverters._
-import pekko.util.ccompat.JavaConverters._
 import org.testcontainers.containers.GenericContainer
 import org.testcontainers.utility.DockerImageName
 
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.OptionConverters._
+
 object TestcontainersKafka {
   trait Spec extends KafkaSpec {
     private var cluster: KafkaContainerCluster = _
@@ -85,8 +86,8 @@ object TestcontainersKafka {
           internalTopicsReplicationFactor,
           settings.useSchemaRegistry,
           settings.containerLogging,
-          settings.clusterStartTimeout.asJava,
-          settings.readinessCheckTimeout.asJava)
+          settings.clusterStartTimeout.toJava,
+          settings.readinessCheckTimeout.toJava)
         configureKafka(brokerContainers)
         configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
         zookeeperContainer match {
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 4519a193..3ae23870 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -31,7 +31,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.{ Producer => KProducer, 
ProducerRecord }
 import org.apache.kafka.common.ConsumerGroupState
@@ -40,6 +39,7 @@ import org.slf4j.{ Logger, LoggerFactory }
 import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
 import scala.util.Try
 
 abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: 
ActorSystem)
diff --git 
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala 
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index 7817f985..e43ea558 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -25,7 +25,6 @@ import 
pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
 import pekko.stream.scaladsl.{ Keep, Sink, Source }
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
 import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
 import org.apache.avro.specific.SpecificRecordBase
 import org.apache.avro.util.Utf8
@@ -34,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
 
 import scala.collection.immutable
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 // #imports
 import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, 
KafkaAvroDeserializer, KafkaAvroSerializer }
 import org.apache.avro.specific.SpecificRecord
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
index 643365d5..17d592b9 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
@@ -27,7 +27,6 @@ import pekko.stream.Materializer
 import pekko.stream.scaladsl.{ Flow, Sink, Source }
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.{ ProducerConfig, ProducerRecord }
 import org.scalatest.TestSuite
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index 098de532..f0d54d35 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -30,7 +30,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.{ ActorAttributes, Supervision }
 import pekko.testkit.{ TestKit, TestProbe }
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -44,6 +43,7 @@ import org.slf4j.{ Logger, LoggerFactory }
 import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 class CommittingProducerSinkSpec(_system: ActorSystem)
     extends TestKit(_system)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index b6fd6d83..e922a8cd 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -18,8 +18,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.pekko
 import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.JavaDurationConverters._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
 import org.mockito.Mockito._
@@ -30,6 +28,8 @@ import org.mockito.{ ArgumentMatchers, Mockito }
 
 import scala.collection.immutable.Seq
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
 
 object ConsumerMock {
   type OnCompleteHandler = Map[TopicPartition, OffsetAndMetadata] => 
(Map[TopicPartition, OffsetAndMetadata], Exception)
@@ -169,7 +169,7 @@ class ConsumerMock[K, V](handler: 
ConsumerMock.CommitHandler = new ConsumerMock.
     }
 
   def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
-    verify(mock, mode).close(ConsumerMock.closeTimeout.asJava)
+    verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
 
   def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()) =
     verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index 7bd3b16a..eca227ad 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -16,13 +16,13 @@ package org.apache.pekko.kafka.internal
 
 import org.apache.pekko
 import pekko.kafka.tests.scaladsl.LogCapturing
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ Consumer, ConsumerRecord, 
ConsumerRecords, OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
 import org.mockito.Mockito
 import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.matchers.should.Matchers
 
+import scala.jdk.CollectionConverters._
 import scala.language.reflectiveCalls
 
 class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with 
LogCapturing {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
index 6840827c..8da09266 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
@@ -22,7 +22,6 @@ import pekko.kafka.internal.KafkaConsumerActor.Internal.Seek
 import pekko.kafka.testkit.scaladsl.Slf4jToPekkoLoggingAdapter
 import pekko.kafka.tests.scaladsl.LogCapturing
 import pekko.testkit.{ ImplicitSender, TestKit }
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerRecord, ConsumerRecords }
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.internals.RecordHeaders
@@ -33,6 +32,7 @@ import org.slf4j.{ Logger, LoggerFactory }
 
 import java.util.Optional
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 class ConsumerResetProtectionSpec
     extends TestKit(ActorSystem("ConsumerResetProtectionSpec"))
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index 8b1468dd..d92f55ed 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -26,7 +26,6 @@ import pekko.stream.scaladsl._
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.serialization.StringDeserializer
@@ -39,6 +38,7 @@ import org.scalatest.matchers.should.Matchers
 import scala.collection.immutable.Seq
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
 
 object ConsumerSpec {
   type K = String
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 8d998210..36158400 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -28,7 +28,6 @@ import pekko.stream.scaladsl._
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
@@ -41,6 +40,7 @@ import org.slf4j.{ Logger, LoggerFactory }
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 class PartitionedSourceSpec(_system: ActorSystem)
     extends TestKit(_system)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 5dd0df32..d6a0ec04 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -28,7 +28,6 @@ import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
 import pekko.stream.{ ActorAttributes, Supervision }
 import pekko.testkit.TestKit
 import pekko.{ Done, NotUsed }
-import pekko.util.ccompat.JavaConverters._
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, 
OffsetAndMetadata }
 import org.apache.kafka.clients.producer._
@@ -46,6 +45,7 @@ import org.scalatest.matchers.should.Matchers
 
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
+import scala.jdk.CollectionConverters._
 import scala.util.{ Failure, Success, Try }
 
 class ProducerSpec(_system: ActorSystem)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
index f66bd4bd..22c98b9c 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
@@ -22,13 +22,13 @@ import org.apache.pekko
 import pekko.Done
 import pekko.kafka.internal.ConsumerControlAsJava
 import pekko.kafka.tests.scaladsl.LogCapturing
-import pekko.util.FutureConverters._
 import org.apache.kafka.common.{ Metric, MetricName }
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
 
 import scala.concurrent.Future
+import scala.jdk.FutureConverters._
 import scala.language.reflectiveCalls
 
 object ControlSpec {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index a673428a..cd2471c1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -26,7 +26,6 @@ import 
pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestProbe
 import pekko.{ Done, NotUsed }
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.clients.consumer.{ ConsumerConfig, 
ConsumerPartitionAssignor, ConsumerRecord }
 import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
 import org.apache.kafka.common.TopicPartition
@@ -34,6 +33,7 @@ import org.scalatest.Inside
 import org.slf4j.{ Logger, LoggerFactory }
 
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 import scala.util.Random
 
 class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside {
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
index 8017a783..870a120f 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
@@ -24,10 +24,10 @@ import 
pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
 import pekko.stream.scaladsl.Keep
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike {
   // 
https://docs.confluent.io/current/installation/versions-interoperability.html
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
index 2711a7b3..75267da4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
@@ -19,13 +19,13 @@ import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
 import pekko.kafka.Subscriptions
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 import org.scalatest.Inside
 import org.scalatest.concurrent.IntegrationPatience
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 
 class TimestampSpec extends SpecBase with TestcontainersKafkaLike with Inside 
with IntegrationPatience {
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
index 5d4f2401..ffa4a2b3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
@@ -85,7 +85,7 @@ import ch.qos.logback.core.AppenderBase
    * Also clears the buffer..
    */
   def flush(): Unit = synchronized {
-    import pekko.util.ccompat.JavaConverters._
+    import scala.jdk.CollectionConverters._
     val logbackLogger = getLogbackLogger(classOf[CapturingAppender].getName + 
"Delegate")
     val appenders = logbackLogger.iteratorForAppenders().asScala.filterNot(_ 
== this).toList
     for (event <- buffer; appender <- appenders) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to