This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 416d354f use scala converters (#367)
416d354f is described below
commit 416d354fa83162467527d4b96d9c1010762c2dd4
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Sep 21 13:15:23 2025 +0100
use scala converters (#367)
* use scala converters
* next batch
* more
* more updates
* scalafmt
---
.../pekko/kafka/benchmarks/InflightMetrics.scala | 2 +-
.../kafka/benchmarks/KafkaConsumerBenchmarks.scala | 2 +-
.../kafka/benchmarks/KafkaConsumerFixtureGen.scala | 3 +-
.../benchmarks/KafkaTransactionBenchmarks.scala | 2 +-
.../benchmarks/KafkaTransactionFixtureGen.scala | 3 +-
.../ReactiveKafkaConsumerBenchmarks.scala | 3 +-
.../cluster/sharding/KafkaClusterSharding.scala | 10 ++--
.../org/apache/pekko/kafka/CommitterSettings.scala | 5 +-
.../pekko/kafka/ConnectionCheckerSettings.scala | 10 ++--
.../org/apache/pekko/kafka/ConsumerMessage.scala | 2 +-
.../org/apache/pekko/kafka/ConsumerSettings.scala | 66 +++++++++++-----------
.../scala/org/apache/pekko/kafka/Metadata.scala | 3 +-
.../kafka/OffsetResetProtectionSettings.scala | 9 +--
.../org/apache/pekko/kafka/ProducerMessage.scala | 2 +-
.../org/apache/pekko/kafka/ProducerSettings.scala | 17 +++---
.../org/apache/pekko/kafka/Subscriptions.scala | 2 +-
.../pekko/kafka/internal/CommittableSources.scala | 5 +-
.../pekko/kafka/internal/ConfigSettings.scala | 8 +--
.../kafka/internal/ConsumerProgressTracking.scala | 2 +-
.../kafka/internal/ConsumerResetProtection.scala | 2 +-
.../kafka/internal/ControlImplementations.scala | 9 ++-
.../pekko/kafka/internal/DeferredProducer.scala | 7 +--
.../pekko/kafka/internal/KafkaConsumerActor.scala | 17 ++++--
.../pekko/kafka/internal/MessageBuilder.scala | 5 +-
.../internal/PartitionAssignmentHelpers.scala | 2 +-
.../pekko/kafka/internal/SubSourceLogic.scala | 1 -
.../internal/TransactionalProducerStage.scala | 2 +-
.../kafka/internal/TransactionalSources.scala | 1 -
.../org/apache/pekko/kafka/javadsl/Committer.scala | 2 +-
.../org/apache/pekko/kafka/javadsl/Consumer.scala | 14 ++---
.../pekko/kafka/javadsl/DiscoverySupport.scala | 4 +-
.../pekko/kafka/javadsl/MetadataClient.scala | 24 ++++----
.../org/apache/pekko/kafka/javadsl/Producer.scala | 3 +-
.../apache/pekko/kafka/javadsl/SendProducer.scala | 3 +-
.../apache/pekko/kafka/javadsl/Transactional.scala | 3 +-
.../apache/pekko/kafka/scaladsl/Committer.scala | 5 +-
.../org/apache/pekko/kafka/scaladsl/Consumer.scala | 11 ++--
.../pekko/kafka/scaladsl/DiscoverySupport.scala | 4 +-
.../pekko/kafka/scaladsl/MetadataClient.scala | 13 ++---
.../apache/pekko/kafka/scaladsl/SendProducer.scala | 4 +-
.../org/apache/pekko/kafka/IntegrationTests.scala | 3 +-
.../KafkaTestkitTestcontainersSettings.scala | 17 +++---
.../kafka/testkit/ProducerResultFactory.scala | 2 +-
.../kafka/testkit/internal/KafkaTestKit.scala | 3 +-
.../testkit/internal/TestcontainersKafka.scala | 11 ++--
.../pekko/kafka/testkit/scaladsl/KafkaSpec.scala | 2 +-
.../scaladsl/SchemaRegistrySerializationSpec.scala | 2 +-
.../org/apache/pekko/kafka/TransactionsOps.scala | 1 -
.../internal/CommittingProducerSinkSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ConsumerMock.scala | 6 +-
.../internal/ConsumerProgressTrackingSpec.scala | 2 +-
.../internal/ConsumerResetProtectionSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ConsumerSpec.scala | 2 +-
.../kafka/internal/PartitionedSourceSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ProducerSpec.scala | 2 +-
.../apache/pekko/kafka/javadsl/ControlSpec.scala | 2 +-
.../pekko/kafka/scaladsl/RebalanceSpec.scala | 2 +-
.../pekko/kafka/scaladsl/RetentionPeriodSpec.scala | 2 +-
.../pekko/kafka/scaladsl/TimestampSpec.scala | 2 +-
.../pekko/kafka/tests/CapturingAppender.scala | 2 +-
60 files changed, 182 insertions(+), 179 deletions(-)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
index 69a69462..3d5e5f98 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
@@ -22,13 +22,13 @@ import pekko.actor.Cancellable
import pekko.kafka.scaladsl.Consumer.Control
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Keep, Sink, Source }
-import pekko.util.ccompat.JavaConverters._
import com.codahale.metrics.{ Histogram, MetricRegistry }
import javax.management.remote.{ JMXConnectorFactory, JMXServiceURL }
import javax.management.{ Attribute, MBeanServerConnection, ObjectName }
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
private[benchmarks] trait InflightMetrics {
import InflightMetrics._
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
index 3246c973..2dde81a4 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala
@@ -21,9 +21,9 @@ import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ OffsetAndMetadata,
OffsetCommitCallback }
import org.apache.kafka.common.TopicPartition
-import org.apache.pekko.util.ccompat.JavaConverters._
import scala.annotation.tailrec
+import scala.jdk.CollectionConverters._
object KafkaConsumerBenchmarks extends LazyLogging {
val pollTimeoutMs: Duration = Duration.ofMillis(50L)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
index a5e7646d..45301dfc 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerFixtureGen.scala
@@ -16,10 +16,11 @@ package org.apache.pekko.kafka.benchmarks
import org.apache.pekko
import pekko.kafka.benchmarks.app.RunTestCommand
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer,
StringDeserializer }
+import scala.jdk.CollectionConverters._
+
case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer:
KafkaConsumer[Array[Byte], String]) {
def close(): Unit = consumer.close()
}
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index f94da98f..78be5908 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.kafka.benchmarks
import org.apache.pekko
import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
-import pekko.util.ccompat.JavaConverters._
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer._
@@ -25,6 +24,7 @@ import org.apache.kafka.common.TopicPartition
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
object KafkaTransactionBenchmarks extends LazyLogging {
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
index ead493f0..52451061 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionFixtureGen.scala
@@ -18,7 +18,6 @@ import java.util.Locale
import org.apache.pekko
import pekko.kafka.benchmarks.app.RunTestCommand
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig }
import org.apache.kafka.common.IsolationLevel
@@ -29,6 +28,8 @@ import org.apache.kafka.common.serialization.{
StringSerializer
}
+import scala.jdk.CollectionConverters._
+
case class KafkaTransactionTestFixture(sourceTopic: String,
sinkTopic: String,
msgCount: Int,
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
index a912cbf5..afff0aa4 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko
import pekko.actor.ActorSystem
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.ConsumerMessage.CommittableMessage
import pekko.kafka.benchmarks.InflightMetrics.{ BrokerMetricRequest,
ConsumerMetricRequest }
import pekko.kafka.scaladsl.Committer
@@ -172,7 +171,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging
with InflightMetrics
val control = fixture.source
.mapAsync(1) { m =>
meter.mark()
- m.committableOffset.commitInternal().map(_ =>
m)(ExecutionContexts.parasitic)
+ m.committableOffset.commitInternal().map(_ =>
m)(ExecutionContext.parasitic)
}
.toMat(Sink.foreach { msg =>
if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount -
1)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
index 18a22dea..7dca330b 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
@@ -31,13 +31,13 @@ import pekko.kafka.scaladsl.MetadataClient
import pekko.kafka._
import pekko.util.Timeout._
import org.apache.kafka.common.utils.Utils
+import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.jdk.FutureConverters._
+import scala.jdk.DurationConverters._
import scala.util.{ Failure, Success }
-import pekko.util.FutureConverters._
-import pekko.util.JavaDurationConverters._
-import org.slf4j.LoggerFactory
/**
* API MAY CHANGE
@@ -88,7 +88,7 @@ final class KafkaClusterSharding(system: ExtendedActorSystem)
extends Extension
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]):
CompletionStage[KafkaShardingMessageExtractor[M]] =
- getPartitionCount(topic, timeout.asScala, settings)
+ getPartitionCount(topic, timeout.toScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.asJava
@@ -153,7 +153,7 @@ final class KafkaClusterSharding(system:
ExtendedActorSystem) extends Extension
timeout: java.time.Duration,
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]):
CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
- getPartitionCount(topic, timeout.asScala, settings)
+ getPartitionCount(topic, timeout.toScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e
=> entityIdExtractor.apply(e)))(
system.dispatcher)
.asJava
diff --git a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
index f45a3268..08edde4d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
@@ -13,14 +13,15 @@
*/
package org.apache.pekko.kafka
+
import java.util.concurrent.TimeUnit
import org.apache.pekko
import pekko.annotation.ApiMayChange
-import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/882")
sealed trait CommitDelivery
@@ -184,7 +185,7 @@ class CommitterSettings private (
copy(maxInterval = maxInterval)
def withMaxInterval(maxInterval: java.time.Duration): CommitterSettings =
- copy(maxInterval = maxInterval.asScala)
+ copy(maxInterval = maxInterval.toScala)
def withParallelism(parallelism: Int): CommitterSettings =
copy(parallelism = parallelism)
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
index acd93392..a3fdc3b1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
@@ -14,12 +14,12 @@
package org.apache.pekko.kafka
-import org.apache.pekko.util.JavaDurationConverters._
+import java.time.{ Duration => JDuration }
+
import com.typesafe.config.Config
import scala.concurrent.duration._
-
-import java.time.{ Duration => JDuration }
+import scala.jdk.DurationConverters._
class ConnectionCheckerSettings private[kafka] (val enable: Boolean,
val maxRetries: Int,
@@ -48,7 +48,7 @@ class ConnectionCheckerSettings private[kafka] (val enable:
Boolean,
/** Java API */
def withCheckInterval(checkInterval: JDuration): ConnectionCheckerSettings =
- copy(checkInterval = checkInterval.asScala)
+ copy(checkInterval = checkInterval.toScala)
override def toString: String =
s"org.apache.pekko.kafka.ConnectionCheckerSettings(" +
@@ -78,7 +78,7 @@ object ConnectionCheckerSettings {
if (enable) {
val retries = config.getInt("max-retries")
val factor = config.getDouble("backoff-factor")
- val checkInterval = config.getDuration("check-interval").asScala
+ val checkInterval = config.getDuration("check-interval").toScala
apply(retries, checkInterval, factor)
} else Disabled
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 4f9e0b1e..c6bedd92 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -195,7 +195,7 @@ object ConsumerMessage {
* Create an offset batch out of a list of offsets.
*/
def createCommittableOffsetBatch[T <: Committable](offsets:
java.util.List[T]): CommittableOffsetBatch = {
- import pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
CommittableOffsetBatch(offsets.asScala.toList)
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index fab05f0f..99aaa4e0 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -20,16 +20,16 @@ import java.util.concurrent.{ CompletionStage, Executor }
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.kafka.internal._
-import pekko.util.JavaDurationConverters._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.OptionConverters._
-import pekko.util.FutureConverters._
import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.{ Consumer, ConsumerConfig,
KafkaConsumer }
import org.apache.kafka.common.serialization.Deserializer
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
object ConsumerSettings {
@@ -79,21 +79,21 @@ object ConsumerSettings {
valueDeserializer != null &&
(valueDeserializer.isDefined ||
properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
"Value deserializer should be defined or declared in configuration")
- val pollInterval = config.getDuration("poll-interval").asScala
- val pollTimeout = config.getDuration("poll-timeout").asScala
- val stopTimeout = config.getDuration("stop-timeout").asScala
- val closeTimeout = config.getDuration("close-timeout").asScala
- val commitTimeout = config.getDuration("commit-timeout").asScala
- val commitTimeWarning = config.getDuration("commit-time-warning").asScala
+ val pollInterval = config.getDuration("poll-interval").toScala
+ val pollTimeout = config.getDuration("poll-timeout").toScala
+ val stopTimeout = config.getDuration("stop-timeout").toScala
+ val closeTimeout = config.getDuration("close-timeout").toScala
+ val commitTimeout = config.getDuration("commit-timeout").toScala
+ val commitTimeWarning = config.getDuration("commit-time-warning").toScala
val commitRefreshInterval =
ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval")
val dispatcher = config.getString("use-dispatcher")
- val waitClosePartition = config.getDuration("wait-close-partition").asScala
- val positionTimeout = config.getDuration("position-timeout").asScala
- val offsetForTimesTimeout =
config.getDuration("offset-for-times-timeout").asScala
- val metadataRequestTimeout =
config.getDuration("metadata-request-timeout").asScala
- val drainingCheckInterval =
config.getDuration("eos-draining-check-interval").asScala
+ val waitClosePartition = config.getDuration("wait-close-partition").toScala
+ val positionTimeout = config.getDuration("position-timeout").toScala
+ val offsetForTimesTimeout =
config.getDuration("offset-for-times-timeout").toScala
+ val metadataRequestTimeout =
config.getDuration("metadata-request-timeout").toScala
+ val drainingCheckInterval =
config.getDuration("eos-draining-check-interval").toScala
val connectionCheckerSettings =
ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
- val partitionHandlerWarning =
config.getDuration("partition-handler-warning").asScala
+ val partitionHandlerWarning =
config.getDuration("partition-handler-warning").toScala
val resetProtectionThreshold = OffsetResetProtectionSettings(
config.getConfig(OffsetResetProtectionSettings.configPath))
@@ -339,7 +339,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set the maximum duration a poll to the Kafka broker is allowed to take.
*/
def withPollTimeout(pollTimeout: java.time.Duration): ConsumerSettings[K, V]
=
- copy(pollTimeout = pollTimeout.asScala)
+ copy(pollTimeout = pollTimeout.toScala)
/**
* Set the interval from one scheduled poll to the next.
@@ -352,7 +352,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set the interval from one scheduled poll to the next.
*/
def withPollInterval(pollInterval: java.time.Duration): ConsumerSettings[K,
V] =
- copy(pollInterval = pollInterval.asScala)
+ copy(pollInterval = pollInterval.toScala)
/**
* The stage will await outstanding offset commit requests before
@@ -369,7 +369,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* stop forcefully.
*/
def withStopTimeout(stopTimeout: java.time.Duration): ConsumerSettings[K, V]
=
- copy(stopTimeout = stopTimeout.asScala)
+ copy(stopTimeout = stopTimeout.toScala)
/**
* Set duration to wait for `KafkaConsumer.close` to finish.
@@ -382,7 +382,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set duration to wait for `KafkaConsumer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ConsumerSettings[K,
V] =
- copy(closeTimeout = closeTimeout.asScala)
+ copy(closeTimeout = closeTimeout.toScala)
/**
* If offset commit requests are not completed within this timeout
@@ -397,7 +397,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* the returned Future is completed with
[[pekko.kafka.CommitTimeoutException]].
*/
def withCommitTimeout(commitTimeout: java.time.Duration):
ConsumerSettings[K, V] =
- copy(commitTimeout = commitTimeout.asScala)
+ copy(commitTimeout = commitTimeout.toScala)
/**
* If commits take longer than this time a warning is logged
@@ -410,7 +410,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* If commits take longer than this time a warning is logged
*/
def withCommitWarning(commitTimeWarning: java.time.Duration):
ConsumerSettings[K, V] =
- copy(commitTimeWarning = commitTimeWarning.asScala)
+ copy(commitTimeWarning = commitTimeWarning.toScala)
/**
* Fully qualified config path which holds the dispatcher configuration
@@ -438,7 +438,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
*/
def withCommitRefreshInterval(commitRefreshInterval: java.time.Duration):
ConsumerSettings[K, V] =
if (commitRefreshInterval.isZero) copy(commitRefreshInterval =
Duration.Inf)
- else copy(commitRefreshInterval = commitRefreshInterval.asScala)
+ else copy(commitRefreshInterval = commitRefreshInterval.toScala)
/**
* Time to wait for pending requests when a partition is closed.
@@ -458,7 +458,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Time to wait for pending requests when a partition is closed.
*/
def withWaitClosePartition(waitClosePartition: java.time.Duration):
ConsumerSettings[K, V] =
- copy(waitClosePartition = waitClosePartition.asScala)
+ copy(waitClosePartition = waitClosePartition.toScala)
/** Scala API: Limits the blocking on Kafka consumer position calls. */
def withPositionTimeout(positionTimeout: FiniteDuration):
ConsumerSettings[K, V] =
@@ -466,7 +466,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
/** Java API: Limits the blocking on Kafka consumer position calls. */
def withPositionTimeout(positionTimeout: java.time.Duration):
ConsumerSettings[K, V] =
- copy(positionTimeout = positionTimeout.asScala)
+ copy(positionTimeout = positionTimeout.toScala)
/** Scala API: Limits the blocking on Kafka consumer offsetForTimes calls. */
def withOffsetForTimesTimeout(offsetForTimesTimeout: FiniteDuration):
ConsumerSettings[K, V] =
@@ -474,7 +474,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
/** Java API: Limits the blocking on Kafka consumer offsetForTimes calls. */
def withOffsetForTimesTimeout(offsetForTimesTimeout: java.time.Duration):
ConsumerSettings[K, V] =
- copy(offsetForTimesTimeout = offsetForTimesTimeout.asScala)
+ copy(offsetForTimesTimeout = offsetForTimesTimeout.toScala)
/** Scala API */
def withMetadataRequestTimeout(metadataRequestTimeout: FiniteDuration):
ConsumerSettings[K, V] =
@@ -482,7 +482,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
/** Java API */
def withMetadataRequestTimeout(metadataRequestTimeout: java.time.Duration):
ConsumerSettings[K, V] =
- copy(metadataRequestTimeout = metadataRequestTimeout.asScala)
+ copy(metadataRequestTimeout = metadataRequestTimeout.toScala)
/** Scala API: Check interval for TransactionalProducer when finishing
transaction before shutting down consumer */
def withDrainingCheckInterval(drainingCheckInterval: FiniteDuration):
ConsumerSettings[K, V] =
@@ -490,7 +490,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
/** Java API: Check interval for TransactionalProducer when finishing
transaction before shutting down consumer */
def withDrainingCheckInterval(drainingCheckInterval: java.time.Duration):
ConsumerSettings[K, V] =
- copy(drainingCheckInterval = drainingCheckInterval.asScala)
+ copy(drainingCheckInterval = drainingCheckInterval.toScala)
/** Scala API */
def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration):
ConsumerSettings[K, V] =
@@ -498,7 +498,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
/** Java API */
def withPartitionHandlerWarning(partitionHandlerWarning:
java.time.Duration): ConsumerSettings[K, V] =
- copy(partitionHandlerWarning = partitionHandlerWarning.asScala)
+ copy(partitionHandlerWarning = partitionHandlerWarning.toScala)
/**
* Scala API.
@@ -535,10 +535,10 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
*/
def getProperties: java.util.Map[String, AnyRef] =
properties.asInstanceOf[Map[String, AnyRef]].asJava
- def getCloseTimeout: java.time.Duration = closeTimeout.asJava
- def getPositionTimeout: java.time.Duration = positionTimeout.asJava
- def getOffsetForTimesTimeout: java.time.Duration =
offsetForTimesTimeout.asJava
- def getMetadataRequestTimeout: java.time.Duration =
metadataRequestTimeout.asJava
+ def getCloseTimeout: java.time.Duration = closeTimeout.toJava
+ def getPositionTimeout: java.time.Duration = positionTimeout.toJava
+ def getOffsetForTimesTimeout: java.time.Duration =
offsetForTimesTimeout.toJava
+ def getMetadataRequestTimeout: java.time.Duration =
metadataRequestTimeout.toJava
private def copy(
properties: Map[String, String] = properties,
diff --git a/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
b/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
index 3fdd5cb9..453a6d03 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/Metadata.scala
@@ -18,12 +18,11 @@ import java.util.Optional
import org.apache.pekko
import pekko.actor.NoSerializationVerificationNeeded
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ OffsetAndMetadata,
OffsetAndTimestamp }
import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
import scala.util.Try
+import scala.jdk.CollectionConverters._
/**
* Messages for Kafka metadata fetching via [[KafkaConsumerActor]].
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
index 1e935145..c3d0dc95 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
@@ -13,14 +13,15 @@
*/
package org.apache.pekko.kafka
+
import java.time.{ Duration => JDuration }
import org.apache.pekko
import pekko.annotation.InternalApi
-import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable:
Boolean,
val offsetThreshold: Long,
@@ -60,7 +61,7 @@ class OffsetResetProtectionSettings @InternalApi
private[kafka] (val enable: Boo
* If the record is more than this duration earlier the last received
record, it is considered a reset
*/
def withTimeThreshold(timeThreshold: JDuration):
OffsetResetProtectionSettings =
- copy(timeThreshold = timeThreshold.asScala)
+ copy(timeThreshold = timeThreshold.toScala)
override def toString: String =
s"org.apache.pekko.kafka.OffsetResetProtectionSettings(" +
@@ -89,7 +90,7 @@ object OffsetResetProtectionSettings {
* threshold are considered indicative of an offset reset.
*/
def apply(offsetThreshold: Long, timeThreshold: java.time.Duration):
OffsetResetProtectionSettings =
- new OffsetResetProtectionSettings(true, offsetThreshold,
timeThreshold.asScala)
+ new OffsetResetProtectionSettings(true, offsetThreshold,
timeThreshold.toScala)
/**
* Create settings from a configuration with layout `connection-checker`.
@@ -98,7 +99,7 @@ object OffsetResetProtectionSettings {
val enable = config.getBoolean("enable")
if (enable) {
val offsetThreshold = config.getLong("offset-threshold")
- val timeThreshold = config.getDuration("time-threshold").asScala
+ val timeThreshold = config.getDuration("time-threshold").toScala
apply(offsetThreshold, timeThreshold)
} else Disabled
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
index 53a1f3f9..0690403f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerMessage.scala
@@ -16,10 +16,10 @@ package org.apache.pekko.kafka
import org.apache.pekko
import pekko.NotUsed
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import scala.collection.immutable
+import scala.jdk.CollectionConverters._
/**
* Classes that are used in both [[javadsl.Producer]] and
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index f8f5d855..eee0dfc9 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -25,12 +25,11 @@ import org.apache.kafka.clients.producer.{ KafkaProducer,
Producer, ProducerConf
import org.apache.kafka.common.serialization.Serializer
import scala.concurrent.duration._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
-import pekko.util.JavaDurationConverters._
-import pekko.util.OptionConverters._
-
import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
object ProducerSettings {
@@ -78,11 +77,11 @@ object ProducerSettings {
valueSerializer != null &&
(valueSerializer.isDefined ||
properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
"Value serializer should be defined or declared in configuration")
- val closeTimeout = config.getDuration("close-timeout").asScala
+ val closeTimeout = config.getDuration("close-timeout").toScala
val closeOnProducerStop = config.getBoolean("close-on-producer-stop")
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
- val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
+ val eosCommitInterval = config.getDuration("eos-commit-interval").toScala
new ProducerSettings[K, V](
properties,
keySerializer,
@@ -295,7 +294,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* Duration to wait for `KafkaProducer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K,
V] =
- copy(closeTimeout = closeTimeout.asScala)
+ copy(closeTimeout = closeTimeout.toScala)
/**
* Call `KafkaProducer.close` on the
[[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage
@@ -330,7 +329,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
* The time interval to commit a transaction when using the
`Transactional.sink` or `Transactional.flow`.
*/
def withEosCommitInterval(eosCommitInterval: java.time.Duration):
ProducerSettings[K, V] =
- copy(eosCommitInterval = eosCommitInterval.asScala)
+ copy(eosCommitInterval = eosCommitInterval.toScala)
/**
* Scala API.
diff --git a/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
b/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
index 15ce108a..07a913f1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/Subscriptions.scala
@@ -19,10 +19,10 @@ import pekko.actor.ActorRef
import pekko.annotation.{ ApiMayChange, InternalApi }
import pekko.kafka.internal.PartitionAssignmentHelpers
import
pekko.kafka.internal.PartitionAssignmentHelpers.EmptyPartitionAssignmentHandler
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.common.TopicPartition
import scala.annotation.varargs
+import scala.jdk.CollectionConverters._
sealed trait Subscription {
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
index 6d2a88cf..f5934482 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/CommittableSources.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.internal
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
import pekko.kafka._
import pekko.kafka.internal.KafkaConsumerActor.Internal.{ Commit,
CommitSingle, CommitWithoutReply }
@@ -153,7 +152,7 @@ private[kafka] object KafkaAsyncConsumerCommitterRef {
}
getFirstExecutionContext(batch)
.map { implicit ec =>
- Future.sequence(futures).map(_ => Done)(ExecutionContexts.parasitic)
+ Future.sequence(futures).map(_ => Done)(ExecutionContext.parasitic)
}
.getOrElse(Future.successful(Done))
}
@@ -211,7 +210,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private
val consumerActor: A
import pekko.pattern.ask
consumerActor
.ask(msg)(Timeout(commitTimeout))
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case e: AskTimeoutException =>
Future.failed(new CommitTimeoutException(s"Kafka commit took longer
than: $commitTimeout (${e.getMessage})"))
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
index 6f2f8efc..70ecca8a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConfigSettings.scala
@@ -22,8 +22,8 @@ import com.typesafe.config.{ Config, ConfigObject }
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.JavaDurationConverters._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
/**
* INTERNAL API
@@ -52,7 +52,7 @@ import pekko.util.JavaDurationConverters._
}
import org.apache.kafka
- import org.apache.pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
def serializeAndMaskKafkaProperties[A <: kafka.common.config.AbstractConfig](
properties: Map[String, AnyRef], constructor: java.util.Map[String,
AnyRef] => A): String = {
@@ -69,7 +69,7 @@ import pekko.util.JavaDurationConverters._
def getPotentiallyInfiniteDuration(underlying: Config, path: String):
Duration = underlying.getString(path) match {
case "infinite" => Duration.Inf
- case _ => underlying.getDuration(path).asScala
+ case _ => underlying.getDuration(path).toScala
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
index 4e693899..7ca6a03e 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerProgressTracking.scala
@@ -16,7 +16,7 @@ package org.apache.pekko.kafka.internal
import org.apache.pekko
import pekko.annotation.InternalApi
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer.{ Consumer, ConsumerRecords,
OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 504053db..55006009 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -22,7 +22,7 @@ import pekko.annotation.InternalApi
import pekko.event.LoggingAdapter
import pekko.kafka.OffsetResetProtectionSettings
import pekko.kafka.internal.KafkaConsumerActor.Internal.Seek
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer.{ ConsumerRecord, ConsumerRecords,
OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
index 624d3cff..2b192559 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/ControlImplementations.scala
@@ -19,17 +19,16 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.internal.KafkaConsumerActor.Internal.{ ConsumerMetrics,
RequestMetrics }
import pekko.kafka.{ javadsl, scaladsl }
import pekko.stream.SourceShape
import pekko.stream.stage.GraphStageLogic
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
import pekko.util.Timeout
import org.apache.kafka.common.{ Metric, MetricName }
import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
private object PromiseControl {
sealed trait ControlOperation
@@ -96,7 +95,7 @@ private trait MetricsControl extends
scaladsl.Consumer.Control {
consumer
.ask(RequestMetrics)(Timeout(1.minute))
.mapTo[ConsumerMetrics]
- .map(_.metrics)(ExecutionContexts.parasitic)
+ .map(_.metrics)(ExecutionContext.parasitic)
}(executionContext)
}
}
@@ -115,7 +114,7 @@ final private[kafka] class
ConsumerControlAsJava(underlying: scaladsl.Consumer.C
override def isShutdown: CompletionStage[Done] = underlying.isShutdown.asJava
override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] =
- underlying.metrics.map(_.asJava)(ExecutionContexts.parasitic).asJava
+ underlying.metrics.map(_.asJava)(ExecutionContext.parasitic).asJava
}
/** Internal API */
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 944f39e0..746690f8 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -16,13 +16,12 @@ package org.apache.pekko.kafka.internal
import org.apache.pekko
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.ProducerSettings
import pekko.stream.stage._
-import pekko.util.JavaDurationConverters._
import org.apache.kafka.clients.producer.Producer
import scala.concurrent.ExecutionContext
+import scala.jdk.DurationConverters._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
@@ -89,7 +88,7 @@ private[kafka] trait DeferredProducer[K, V] {
log.error(e, "producer creation failed")
closeAndFailStageCb.invoke(e)
e
- })(ExecutionContexts.parasitic)
+ })(ExecutionContext.parasitic)
changeProducerAssignmentLifecycle(AsyncCreateRequestSent)
}
}
@@ -112,7 +111,7 @@ private[kafka] trait DeferredProducer[K, V] {
try {
// we do not have to check if producer was already closed in
send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
- producer.close(producerSettings.closeTimeout.asJava)
+ producer.close(producerSettings.closeTimeout.toJava)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer
close")
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index 87a56426..df6e6b61 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -31,13 +31,10 @@ import pekko.actor.{
Timers
}
import pekko.annotation.InternalApi
-import pekko.util.JavaDurationConverters._
import pekko.event.LoggingReceive
import pekko.kafka.KafkaConsumerActor.{ StopLike, StoppingException }
import pekko.kafka._
import pekko.kafka.scaladsl.PartitionAssignmentHandler
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
@@ -45,6 +42,8 @@ import org.apache.kafka.common.{ Metric, MetricName,
TopicPartition }
import scala.annotation.nowarn
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
import scala.util.{ Success, Try }
import scala.util.control.NonFatal
@@ -420,7 +419,7 @@ import scala.util.control.NonFatal
this.settings = updatedSettings
if (settings.connectionCheckerSettings.enable)
context.actorOf(ConnectionChecker.props(settings.connectionCheckerSettings))
- pollTimeout = settings.pollTimeout.asJava
+ pollTimeout = settings.pollTimeout.toJava
offsetForTimesTimeout = settings.getOffsetForTimesTimeout
positionTimeout = settings.getPositionTimeout
val progressTrackingFactory: () => ConsumerProgressTracking = () =>
ensureProgressTracker()
@@ -763,7 +762,8 @@ import scala.util.control.NonFatal
private[KafkaConsumerActor] final class RebalanceListenerImpl(
partitionAssignmentHandler: PartitionAssignmentHandler) extends
RebalanceListener {
- private val restrictedConsumer = new RestrictedConsumer(consumer,
settings.partitionHandlerWarning.*(0.95d).asJava)
+ private val restrictedConsumer =
+ new RestrictedConsumer(consumer,
toJavaDuration(settings.partitionHandlerWarning.*(0.95d)))
private val warningDuration = settings.partitionHandlerWarning.toNanos
override def onPartitionsAssigned(partitions:
java.util.Collection[TopicPartition]): Unit = {
@@ -807,6 +807,13 @@ import scala.util.control.NonFatal
duration / 1000000L)
}
}
+
+ // scala.jdk.DurationConverters only works with FiniteDuration
+ // the value here should always be finite
+ private def toJavaDuration(d: Duration): java.time.Duration = d match {
+ case fd: FiniteDuration => fd.toJava
+ case _ => throw new IllegalArgumentException(s"Expected
FiniteDuration, got $d")
+ }
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index e83aa9b4..9c060377 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -27,14 +27,13 @@ import pekko.kafka.ConsumerMessage.{
TransactionalMessage,
_
}
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse
import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
/** Internal API */
@InternalApi
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
index ccd2569e..f4aafd8a 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/PartitionAssignmentHelpers.scala
@@ -21,7 +21,7 @@ import pekko.kafka.scaladsl.PartitionAssignmentHandler
import pekko.kafka.javadsl
import pekko.kafka.{ AutoSubscription, RestrictedConsumer,
TopicPartitionsAssigned, TopicPartitionsRevoked }
import pekko.stream.stage.AsyncCallback
-import pekko.util.ccompat.JavaConverters._
+import scala.jdk.CollectionConverters._
import org.apache.kafka.common.TopicPartition
/**
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index dff06e84..4aada7fa 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -29,7 +29,6 @@ import pekko.stream.scaladsl.Source
import pekko.stream.stage.GraphStageLogic.StageActor
import pekko.stream.stage._
import pekko.stream.{ Attributes, Outlet, SourceShape }
-import pekko.util.ccompat._
import pekko.util.Timeout
import org.apache.kafka.common.TopicPartition
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 6050cd18..31d8464e 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -24,13 +24,13 @@ import
pekko.kafka.internal.ProducerStage.ProducerCompletionState
import pekko.kafka.{ ConsumerMessage, ProducerSettings }
import pekko.stream.stage._
import pekko.stream.{ Attributes, FlowShape }
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata,
OffsetAndMetadata }
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import scala.concurrent.Future
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
/**
* INTERNAL API
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
index df251cca..718fc8f7 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
@@ -31,7 +31,6 @@ import pekko.kafka.{ AutoSubscription, ConsumerFailed,
ConsumerSettings, Restric
import pekko.stream.SourceShape
import pekko.stream.scaladsl.Source
import pekko.stream.stage.{ AsyncCallback, GraphStageLogic }
-import pekko.util.ccompat._
import pekko.util.Timeout
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord,
OffsetAndMetadata }
import org.apache.kafka.common.{ IsolationLevel, TopicPartition }
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
index a68dc9b2..d34b4022 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Committer.scala
@@ -23,7 +23,7 @@ import pekko.{ Done, NotUsed }
import pekko.kafka.ConsumerMessage.{ Committable, CommittableOffsetBatch }
import pekko.kafka.{ scaladsl, CommitterSettings }
import pekko.stream.javadsl.{ Flow, FlowWithContext, Sink }
-import pekko.util.FutureConverters._
+import scala.jdk.FutureConverters._
object Committer {
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
index 65553714..e7359a3f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
@@ -19,19 +19,19 @@ import java.util.concurrent.{ CompletionStage, Executor }
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
import pekko.japi.Pair
import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
import pekko.kafka._
import pekko.kafka.internal.{ ConsumerControlAsJava, SourceWithOffsetContext }
import pekko.stream.javadsl.{ Source, SourceWithContext }
import pekko.{ Done, NotUsed }
-import pekko.util.FutureConverters._
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
+import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
/**
* Apache Pekko Stream connector for subscribing to Kafka topics.
@@ -287,7 +287,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
-
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
_ => ())
.map {
case (tp, source) => Pair(tp, source.asJava)
@@ -317,7 +317,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
-
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
(tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava))
.map {
case (tp, source) => Pair(tp, source.asJava)
@@ -355,7 +355,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
-
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
_ => ())
.map {
case (tp, source) => Pair(tp, source.asJava)
@@ -379,7 +379,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
-
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
+
getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic),
(tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava))
.map {
case (tp, source) => Pair(tp, source.asJava)
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
index 7d6c422f..2b412b80 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/DiscoverySupport.scala
@@ -19,11 +19,11 @@ import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.kafka.{ scaladsl, ConsumerSettings, ProducerSettings }
-import pekko.util.FunctionConverters._
-import pekko.util.FutureConverters._
import com.typesafe.config.Config
import scala.concurrent.Future
+import scala.jdk.FunctionConverters._
+import scala.jdk.FutureConverters._
/**
* Scala API.
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
index c474e03c..14f7bda7 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/MetadataClient.scala
@@ -18,16 +18,14 @@ import java.util.concurrent.{ CompletionStage, Executor }
import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem }
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.ConsumerSettings
-import pekko.util.ccompat._
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
import pekko.util.Timeout
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
import scala.concurrent.ExecutionContext
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClient) {
@@ -37,13 +35,13 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
.getBeginningOffsets(partitions.asScala.toSet)
.map { beginningOffsets =>
beginningOffsets.view.mapValues(Long.box).toMap.asJava
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
.asJava
def getBeginningOffsetForPartition[K, V](partition: TopicPartition):
CompletionStage[java.lang.Long] =
metadataClient
.getBeginningOffsetForPartition(partition)
- .map(Long.box)(ExecutionContexts.parasitic)
+ .map(Long.box)(ExecutionContext.parasitic)
.asJava
def getEndOffsets(
@@ -52,13 +50,13 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
.getEndOffsets(partitions.asScala.toSet)
.map { endOffsets =>
endOffsets.view.mapValues(Long.box).toMap.asJava
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
.asJava
def getEndOffsetForPartition(partition: TopicPartition):
CompletionStage[java.lang.Long] =
metadataClient
.getEndOffsetForPartition(partition)
- .map(Long.box)(ExecutionContexts.parasitic)
+ .map(Long.box)(ExecutionContext.parasitic)
.asJava
def listTopics(): CompletionStage[java.util.Map[java.lang.String,
java.util.List[PartitionInfo]]] =
@@ -66,7 +64,7 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
.listTopics()
.map { topics =>
topics.view.mapValues(partitionsInfo =>
partitionsInfo.asJava).toMap.asJava
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
.asJava
def getPartitionsFor(topic: java.lang.String):
CompletionStage[java.util.List[PartitionInfo]] =
@@ -74,7 +72,7 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
.getPartitionsFor(topic)
.map { partitionsInfo =>
partitionsInfo.asJava
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
.asJava
@deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
@@ -89,7 +87,7 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
.getCommittedOffsets(partitions.asScala.toSet)
.map { committedOffsets =>
committedOffsets.asJava
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
.asJava
def close(): Unit =
@@ -99,7 +97,7 @@ class MetadataClient private (metadataClient:
pekko.kafka.scaladsl.MetadataClien
object MetadataClient {
def create(consumerActor: ActorRef, timeout: Timeout, executor: Executor):
MetadataClient = {
- implicit val ec: ExecutionContext =
ExecutionContexts.fromExecutor(executor)
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val metadataClient =
pekko.kafka.scaladsl.MetadataClient.create(consumerActor, timeout)
new MetadataClient(metadataClient)
}
@@ -109,7 +107,7 @@ object MetadataClient {
system: ActorSystem,
executor: Executor): MetadataClient = {
val metadataClient = pekko.kafka.scaladsl.MetadataClient
- .create(consumerSettings, timeout)(system,
ExecutionContexts.fromExecutor(executor))
+ .create(consumerSettings, timeout)(system,
ExecutionContext.fromExecutor(executor))
new MetadataClient(metadataClient)
}
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
index f551a122..5fcdfdbd 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
@@ -15,6 +15,7 @@
package org.apache.pekko.kafka.javadsl
import java.util.concurrent.CompletionStage
+
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.kafka.ConsumerMessage.Committable
@@ -22,10 +23,10 @@ import pekko.kafka.ProducerMessage._
import pekko.kafka.{ scaladsl, CommitterSettings, ConsumerMessage,
ProducerSettings }
import pekko.stream.javadsl.{ Flow, FlowWithContext, Sink }
import pekko.{ japi, Done, NotUsed }
-import pekko.util.FutureConverters._
import org.apache.kafka.clients.producer.ProducerRecord
import scala.annotation.nowarn
+import scala.jdk.FutureConverters._
/**
* Apache Pekko Stream connector for publishing messages to Kafka topics.
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
index b266af7c..efd992be 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
@@ -21,9 +21,10 @@ import pekko.Done
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.kafka.ProducerMessage._
import pekko.kafka.{ scaladsl, ProducerSettings }
-import pekko.util.FutureConverters._
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
+import scala.jdk.FutureConverters._
+
/**
* Utility class for producing to Kafka without using Apache Pekko Streams.
*/
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
index 1ef5afd9..f0445143 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
@@ -26,9 +26,10 @@ import pekko.kafka.internal.{ ConsumerControlAsJava,
TransactionalSourceWithOffs
import pekko.kafka.javadsl.Consumer.Control
import pekko.stream.javadsl._
import pekko.{ Done, NotUsed }
-import pekko.util.FutureConverters._
import org.apache.kafka.clients.consumer.ConsumerRecord
+import scala.jdk.FutureConverters._
+
/**
* Apache Pekko Stream connector to support transactions between Kafka topics.
*/
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
index 2fc07110..43edd5b7 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Committer.scala
@@ -16,14 +16,13 @@ package org.apache.pekko.kafka.scaladsl
import org.apache.pekko
import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.CommitterSettings
import pekko.kafka.ConsumerMessage.{ Committable, CommittableOffsetBatch }
import pekko.kafka.internal.CommitCollectorStage
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink }
import pekko.{ Done, NotUsed }
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
object Committer {
@@ -47,7 +46,7 @@ object Committer {
case WaitForAck =>
offsetBatches
.mapAsyncUnordered(settings.parallelism) { batch =>
- batch.commitInternal().map(_ => batch)(ExecutionContexts.parasitic)
+ batch.commitInternal().map(_ => batch)(ExecutionContext.parasitic)
}
case SendAndForget =>
offsetBatches.map(_.tellCommit())
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
index e98caf1e..95e5482e 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
@@ -17,7 +17,6 @@ package org.apache.pekko.kafka.scaladsl
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
import pekko.kafka._
import pekko.kafka.internal._
@@ -114,8 +113,8 @@ object Consumer {
override def shutdown(): Future[Done] =
control
.shutdown()
- .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .flatMap(_ => streamCompletion)(ExecutionContext.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
override def drainAndShutdown[S](streamCompletion: Future[S])(implicit ec:
ExecutionContext): Future[S] =
control.drainAndShutdown(streamCompletion)
@@ -129,8 +128,8 @@ object Consumer {
override def isShutdown: Future[Done] =
control.isShutdown
- .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .flatMap(_ => streamCompletion)(ExecutionContext.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
override def metrics: Future[Map[MetricName, Metric]] = control.metrics
}
@@ -265,7 +264,7 @@ object Consumer {
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
committableSource[K, V](settings, subscription).mapAsync(1) { m =>
- m.committableOffset.commitInternal().map(_ =>
m.record)(ExecutionContexts.parasitic)
+ m.committableOffset.commitInternal().map(_ =>
m.record)(ExecutionContext.parasitic)
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index d39b5926..0d6647bf 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -19,11 +19,11 @@ import pekko.actor.{ ActorSystem, ActorSystemImpl,
ClassicActorSystemProvider }
import pekko.annotation.InternalApi
import pekko.discovery.{ Discovery, ServiceDiscovery }
import pekko.kafka.{ ConsumerSettings, ProducerSettings }
-import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
+import scala.jdk.DurationConverters._
import scala.util.Failure
/**
@@ -72,7 +72,7 @@ object DiscoverySupport {
checkClassOrThrow(system.asInstanceOf[ActorSystemImpl])
val serviceName = config.getString("service-name")
if (serviceName.nonEmpty) {
- val lookupTimeout = config.getDuration("resolve-timeout").asScala
+ val lookupTimeout = config.getDuration("resolve-timeout").toScala
bootstrapServers(discovery(config, system), serviceName, lookupTimeout)
} else throw new IllegalArgumentException(s"value for `service-name` in
$config is empty")
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
index 5796d4a5..62e5aecf 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/MetadataClient.scala
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem }
-import pekko.dispatch.ExecutionContexts
import pekko.kafka.Metadata._
import pekko.kafka.{ ConsumerSettings, KafkaConsumerActor }
import pekko.pattern.ask
@@ -39,7 +38,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
def getBeginningOffsetForPartition(partition: TopicPartition): Future[Long] =
getBeginningOffsets(Set(partition))
@@ -52,7 +51,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
def getEndOffsetForPartition(partition: TopicPartition): Future[Long] =
getEndOffsets(Set(partition))
@@ -65,7 +64,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
def getPartitionsFor(topic: String): Future[List[PartitionInfo]] =
(consumerActor ? GetPartitionsFor(topic))(timeout)
@@ -74,7 +73,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
@deprecated("use `getCommittedOffsets`", "Alpakka Kafka 2.0.3")
def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata]
=
@@ -84,7 +83,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
def getCommittedOffsets(partitions: Set[TopicPartition]):
Future[Map[TopicPartition, OffsetAndMetadata]] =
(consumerActor ? GetCommittedOffsets(partitions))(timeout)
@@ -93,7 +92,7 @@ class MetadataClient private (consumerActor: ActorRef,
timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
def close(): Unit =
if (managedActor) {
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
index d1e6d867..97fd4f6c 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
@@ -19,10 +19,10 @@ import pekko.Done
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.kafka.ProducerMessage._
import pekko.kafka.ProducerSettings
-import pekko.util.JavaDurationConverters._
import org.apache.kafka.clients.producer.{ Callback, ProducerRecord,
RecordMetadata }
import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.jdk.DurationConverters._
/**
* Utility class for producing to Kafka without using Apache Pekko Streams.
@@ -96,7 +96,7 @@ final class SendProducer[K, V] private (val settings:
ProducerSettings[K, V], sy
if (settings.closeProducerOnStop) producerFuture.map { producer =>
// we do not have to check if producer was already closed in
send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
- producer.close(settings.closeTimeout.asJava)
+ producer.close(settings.closeTimeout.toJava)
Done
}
else Future.successful(Done)
diff --git
a/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
b/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
index c1b79607..17ede755 100644
--- a/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
+++ b/int-tests/src/test/scala/org/apache/pekko/kafka/IntegrationTests.scala
@@ -17,11 +17,12 @@ package org.apache.pekko.kafka
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Flow
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import org.testcontainers.containers.GenericContainer
+import scala.jdk.CollectionConverters._
+
object IntegrationTests {
val MessageLogInterval = 500L
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
index db802d37..94cc019b 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
@@ -16,14 +16,15 @@ package org.apache.pekko.kafka.testkit
import java.time.Duration
import java.util.function.Consumer
+
import org.apache.pekko
import pekko.actor.ActorSystem
-import pekko.util.JavaDurationConverters._
-import com.typesafe.config.Config
import pekko.kafka.testkit.internal.PekkoConnectorsKafkaContainer
+import com.typesafe.config.Config
import org.testcontainers.containers.GenericContainer
import scala.concurrent.duration.FiniteDuration
+import scala.jdk.DurationConverters._
final class KafkaTestkitTestcontainersSettings private (
val zooKeeperImage: String,
@@ -103,12 +104,12 @@ final class KafkaTestkitTestcontainersSettings private (
/**
* Java Api
*/
- def getClusterStartTimeout(): Duration = clusterStartTimeout.asJava
+ def getClusterStartTimeout(): Duration = clusterStartTimeout.toJava
/**
* Java Api
*/
- def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.asJava
+ def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.toJava
/**
* Sets the ZooKeeper image
@@ -222,7 +223,7 @@ final class KafkaTestkitTestcontainersSettings private (
* Kafka cluster start up timeout
*/
def withClusterStartTimeout(timeout: Duration):
KafkaTestkitTestcontainersSettings =
- copy(clusterStartTimeout = timeout.asScala)
+ copy(clusterStartTimeout = timeout.toScala)
/**
* Kafka cluster readiness check timeout
@@ -236,7 +237,7 @@ final class KafkaTestkitTestcontainersSettings private (
* Kafka cluster readiness check timeout
*/
def withReadinessCheckTimeout(timeout: Duration):
KafkaTestkitTestcontainersSettings =
- copy(readinessCheckTimeout = timeout.asScala)
+ copy(readinessCheckTimeout = timeout.toScala)
private def copy(
zooKeeperImage: String = zooKeeperImage,
@@ -322,8 +323,8 @@ object KafkaTestkitTestcontainersSettings {
val internalTopicsReplicationFactor =
config.getInt("internal-topics-replication-factor")
val useSchemaRegistry = config.getBoolean("use-schema-registry")
val containerLogging = config.getBoolean("container-logging")
- val clusterStartTimeout =
config.getDuration("cluster-start-timeout").asScala
- val readinessCheckTimeout =
config.getDuration("readiness-check-timeout").asScala
+ val clusterStartTimeout =
config.getDuration("cluster-start-timeout").toScala
+ val readinessCheckTimeout =
config.getDuration("readiness-check-timeout").toScala
new KafkaTestkitTestcontainersSettings(zooKeeperImage,
zooKeeperImageTag,
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
index 7dab2a5b..66571808 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/ProducerResultFactory.scala
@@ -17,11 +17,11 @@ package org.apache.pekko.kafka.testkit
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.kafka.ProducerMessage
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import org.apache.kafka.common.TopicPartition
import scala.collection.immutable
+import scala.jdk.CollectionConverters._
/**
* Factory methods to create instances that normally are emitted by
[[pekko.kafka.scaladsl.Producer]] and [[pekko.kafka.javadsl.Producer]] flows.
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
index 627dd273..6c0bc5b7 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
@@ -23,12 +23,13 @@ import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.kafka.testkit.KafkaTestkitSettings
import pekko.kafka.{ CommitterSettings, ConsumerSettings, ProducerSettings }
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.admin.{ Admin, AdminClientConfig, NewTopic }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ Deserializer, Serializer,
StringDeserializer, StringSerializer }
import org.slf4j.Logger
+import scala.jdk.CollectionConverters._
+
/**
* Common functions for scaladsl and javadsl Testkit.
*
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
index 0f5c77c6..a32e8b77 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
@@ -17,12 +17,13 @@ package org.apache.pekko.kafka.testkit.internal
import org.apache.pekko
import pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
import pekko.kafka.testkit.scaladsl.KafkaSpec
-import pekko.util.JavaDurationConverters._
-import pekko.util.OptionConverters._
-import pekko.util.ccompat.JavaConverters._
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
+import scala.jdk.OptionConverters._
+
object TestcontainersKafka {
trait Spec extends KafkaSpec {
private var cluster: KafkaContainerCluster = _
@@ -85,8 +86,8 @@ object TestcontainersKafka {
internalTopicsReplicationFactor,
settings.useSchemaRegistry,
settings.containerLogging,
- settings.clusterStartTimeout.asJava,
- settings.readinessCheckTimeout.asJava)
+ settings.clusterStartTimeout.toJava,
+ settings.readinessCheckTimeout.toJava)
configureKafka(brokerContainers)
configureKafkaConsumer.accept(brokerContainers.asJavaCollection)
zookeeperContainer match {
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 4519a193..3ae23870 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -31,7 +31,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{ Producer => KProducer,
ProducerRecord }
import org.apache.kafka.common.ConsumerGroupState
@@ -40,6 +39,7 @@ import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
import scala.util.Try
abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem:
ActorSystem)
diff --git
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index 7817f985..e43ea558 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -25,7 +25,6 @@ import
pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import org.apache.avro.specific.SpecificRecordBase
import org.apache.avro.util.Utf8
@@ -34,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
import scala.collection.immutable
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
// #imports
import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig,
KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
index 643365d5..17d592b9 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
@@ -27,7 +27,6 @@ import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ ProducerConfig, ProducerRecord }
import org.scalatest.TestSuite
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index 098de532..f0d54d35 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -30,7 +30,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.{ ActorAttributes, Supervision }
import pekko.testkit.{ TestKit, TestProbe }
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@@ -44,6 +43,7 @@ import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
class CommittingProducerSinkSpec(_system: ActorSystem)
extends TestKit(_system)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index b6fd6d83..e922a8cd 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -18,8 +18,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.pekko
import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.JavaDurationConverters._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.mockito.Mockito._
@@ -30,6 +28,8 @@ import org.mockito.{ ArgumentMatchers, Mockito }
import scala.collection.immutable.Seq
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
object ConsumerMock {
type OnCompleteHandler = Map[TopicPartition, OffsetAndMetadata] =>
(Map[TopicPartition, OffsetAndMetadata], Exception)
@@ -169,7 +169,7 @@ class ConsumerMock[K, V](handler:
ConsumerMock.CommitHandler = new ConsumerMock.
}
def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
- verify(mock, mode).close(ConsumerMock.closeTimeout.asJava)
+ verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()) =
verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
index 7bd3b16a..eca227ad 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala
@@ -16,13 +16,13 @@ package org.apache.pekko.kafka.internal
import org.apache.pekko
import pekko.kafka.tests.scaladsl.LogCapturing
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ Consumer, ConsumerRecord,
ConsumerRecords, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import org.mockito.Mockito
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
+import scala.jdk.CollectionConverters._
import scala.language.reflectiveCalls
class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with
LogCapturing {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
index 6840827c..8da09266 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala
@@ -22,7 +22,6 @@ import pekko.kafka.internal.KafkaConsumerActor.Internal.Seek
import pekko.kafka.testkit.scaladsl.Slf4jToPekkoLoggingAdapter
import pekko.kafka.tests.scaladsl.LogCapturing
import pekko.testkit.{ ImplicitSender, TestKit }
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ ConsumerRecord, ConsumerRecords }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.internals.RecordHeaders
@@ -33,6 +32,7 @@ import org.slf4j.{ Logger, LoggerFactory }
import java.util.Optional
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
class ConsumerResetProtectionSpec
extends TestKit(ActorSystem("ConsumerResetProtectionSpec"))
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index 8b1468dd..d92f55ed 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -26,7 +26,6 @@ import pekko.stream.scaladsl._
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.StringDeserializer
@@ -39,6 +38,7 @@ import org.scalatest.matchers.should.Matchers
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
object ConsumerSpec {
type K = String
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 8d998210..36158400 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -28,7 +28,6 @@ import pekko.stream.scaladsl._
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestKit
-import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
@@ -41,6 +40,7 @@ import org.slf4j.{ Logger, LoggerFactory }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
class PartitionedSourceSpec(_system: ActorSystem)
extends TestKit(_system)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 5dd0df32..d6a0ec04 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -28,7 +28,6 @@ import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.stream.{ ActorAttributes, Supervision }
import pekko.testkit.TestKit
import pekko.{ Done, NotUsed }
-import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata,
OffsetAndMetadata }
import org.apache.kafka.clients.producer._
@@ -46,6 +45,7 @@ import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
+import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success, Try }
class ProducerSpec(_system: ActorSystem)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
index f66bd4bd..22c98b9c 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/javadsl/ControlSpec.scala
@@ -22,13 +22,13 @@ import org.apache.pekko
import pekko.Done
import pekko.kafka.internal.ConsumerControlAsJava
import pekko.kafka.tests.scaladsl.LogCapturing
-import pekko.util.FutureConverters._
import org.apache.kafka.common.{ Metric, MetricName }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scala.concurrent.Future
+import scala.jdk.FutureConverters._
import scala.language.reflectiveCalls
object ControlSpec {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index a673428a..cd2471c1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -26,7 +26,6 @@ import
pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestProbe
import pekko.{ Done, NotUsed }
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.clients.consumer.{ ConsumerConfig,
ConsumerPartitionAssignor, ConsumerRecord }
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
import org.apache.kafka.common.TopicPartition
@@ -34,6 +33,7 @@ import org.scalatest.Inside
import org.slf4j.{ Logger, LoggerFactory }
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
import scala.util.Random
class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
index 8017a783..870a120f 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
@@ -24,10 +24,10 @@ import
pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import pekko.stream.scaladsl.Keep
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike {
//
https://docs.confluent.io/current/installation/versions-interoperability.html
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
index 2711a7b3..75267da4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
@@ -19,13 +19,13 @@ import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import pekko.kafka.Subscriptions
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
-import pekko.util.ccompat.JavaConverters._
import org.apache.kafka.common.TopicPartition
import org.scalatest.Inside
import org.scalatest.concurrent.IntegrationPatience
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
class TimestampSpec extends SpecBase with TestcontainersKafkaLike with Inside
with IntegrationPatience {
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
index 5d4f2401..ffa4a2b3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
@@ -85,7 +85,7 @@ import ch.qos.logback.core.AppenderBase
* Also clears the buffer..
*/
def flush(): Unit = synchronized {
- import pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
val logbackLogger = getLogbackLogger(classOf[CapturingAppender].getName +
"Delegate")
val appenders = logbackLogger.iteratorForAppenders().asScala.filterNot(_
== this).toList
for (event <- buffer; appender <- appenders) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]