This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 8c5d65051cfeb3df4a0eb87039c33131b6b21ccc
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Aug 18 20:06:49 2023 +0100

    add scala3 support for s3 (#167)
    
    * add futiles license
    
    * some more compile issues
    
    * more compile issues
    
    * Update S3.scala
    
    * try .toInt
    
    * remove ASF header from futiles code
    
    * Final touches to get S3 working for Scala 3
    
    ---------
    
    Co-authored-by: Matthew de Detrich <[email protected]>
---
 LICENSE                                            | 15 +++-
 legal/S3License.txt                                |  7 ++
 project/Dependencies.scala                         |  4 +-
 .../pekko/stream/connectors/s3/impl/S3Stream.scala | 33 ++++++---
 .../connectors/s3/impl/auth/CanonicalRequest.scala |  4 +-
 .../pekko/stream/connectors/s3/javadsl/S3.scala    |  1 +
 .../pekko/stream/connectors/s3/TestUtils.scala     |  6 +-
 .../connectors/s3/impl/SplitAfterSizeSpec.scala    |  3 +-
 .../s3/impl/auth/CanonicalRequestSpec.scala        |  2 +-
 .../impl/auth/SplitAfterSizeWithContextSpec.scala  |  3 +-
 .../stream/connectors/s3/impl/retry/Retry.scala    | 86 ++++++++++++++++++++++
 .../stream/connectors/s3/impl/retry/Timeouts.scala | 45 +++++++++++
 .../connectors/s3/scaladsl/S3IntegrationSpec.scala |  6 +-
 .../connectors/s3/scaladsl/S3WireMockBase.scala    |  3 +-
 14 files changed, 188 insertions(+), 30 deletions(-)

diff --git a/LICENSE b/LICENSE
index e8e5b8a10..0889548ba 100644
--- a/LICENSE
+++ b/LICENSE
@@ -202,7 +202,14 @@
 
 ---------------
 
-pekko-mqtt-streaming contains code from paho.mqtt.java 
<https://github.com/eclipse/paho.mqtt.java>.
+pekko-connectors-google-common contains 
`org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala`
+which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>.
+The original code was released under the Apache 2.0 license.
+Copyright 2021 JWT-Scala Contributors.
+
+---------------
+
+pekko-connectors-mqtt-streaming contains code from paho.mqtt.java 
<https://github.com/eclipse/paho.mqtt.java>.
 This code was released under a dual license:
 Eclipse Public License version 2.0 and Eclipse Distribution License.
 We choose to use the code under the Eclipse Distribution License.
@@ -235,7 +242,7 @@ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All 
Rights Reserved.
 
 ---------------
 
-pekko-connectors-google-common contains 
`org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala`
-which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>.
+pekko-connectors-s3 contains test code in 
`org.apache.pekko.stream.connectors.s3.impl.retry` package
+which is copied from futiles <https://github.com/johanandren/futiles>.
 The original code was released under the Apache 2.0 license.
-Copyright 2021 JWT-Scala Contributors.
+Copyright 2015 Johan Andrén.
diff --git a/legal/S3License.txt b/legal/S3License.txt
index e64484eab..5ec7aa63f 100644
--- a/legal/S3License.txt
+++ b/legal/S3License.txt
@@ -209,3 +209,10 @@ This code was released under an Apache 2.0 license.
 
 AWS SDK for Java
 Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+---------------
+
+pekko-connectors-s3 contains test code in 
`org.apache.pekko.stream.connectors.s3.impl.retry` package
+which is copied from futiles <https://github.com/johanandren/futiles>.
+The original code was released under the Apache 2.0 license.
+Copyright 2015 Johan Andrén.
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 0708a1c6f..2effbd4fc 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -364,7 +364,6 @@ object Dependencies {
     ))
 
   val S3 = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-xml" % PekkoHttpVersion,
@@ -373,8 +372,7 @@ object Dependencies {
       "com.google.jimfs" % "jimfs" % "1.2" % Test,
       "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test,
       "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
-      "org.scalatestplus" %% scalaTestScalaCheckArtifact % 
scalaTestScalaCheckVersion % Test,
-      "com.markatta" %% "futiles" % "2.0.2" % Test))
+      "org.scalatestplus" %% scalaTestScalaCheckArtifact % 
scalaTestScalaCheckVersion % Test))
 
   val SpringWeb = {
     val SpringVersion = "5.1.17.RELEASE"
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 3ba8f8cf2..78c47caa6 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -29,7 +29,7 @@ import pekko.http.scaladsl.{ ClientTransport, Http }
 import pekko.stream.connectors.s3.BucketAccess.{ AccessDenied, AccessGranted, 
NotExists }
 import pekko.stream.connectors.s3._
 import pekko.stream.connectors.s3.impl.auth.{ CredentialScope, Signer, 
SigningKey }
-import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, 
Source, Tcp }
+import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, 
Source, SubFlow, Tcp }
 import pekko.stream.{ Attributes, Materializer }
 import pekko.util.ByteString
 import pekko.{ Done, NotUsed }
@@ -1177,11 +1177,15 @@ import scala.util.{ Failure, Success, Try }
 
         import conf.multipartUploadSettings.retrySettings._
 
-        SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
-          .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // 
creates the chunks
-          .mergeSubstreamsWithParallelism(parallelism)
+        val source1: SubFlow[Chunk, NotUsed, Flow[ByteString, ByteString, 
NotUsed]#Repr, Sink[ByteString, NotUsed]] =
+          SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
+            .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // 
creates the chunks
+
+        val source2 = source1.mergeSubstreamsWithParallelism(parallelism)
           .filter(_.size > 0)
           .via(atLeastOne)
+
+        source2
           .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, 
initialUploadState))
           .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % 
parallelism })
           // Allow requests that fail with transient errors to be retried, 
using the already buffered chunk.
@@ -1278,11 +1282,18 @@ import scala.util.{ Failure, Success, Try }
           Flow[(ByteString, C)].orElse(
             Source.single((ByteString.empty, null.asInstanceOf[C])))
 
-        
SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
-          .via(getChunk(chunkBufferSize))
-          .mergeSubstreamsWithParallelism(parallelism)
-          .filter { case (chunk, _) => chunk.size > 0 }
-          .via(atLeastOne)
+        val source1: SubFlow[(Chunk, immutable.Iterable[C]), NotUsed, 
Flow[(ByteString, C), (ByteString, C),
+            NotUsed]#Repr, Sink[(ByteString, C), NotUsed]] =
+          
SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
+            .via(getChunk(chunkBufferSize))
+
+        val source2: Flow[(ByteString, C), (Chunk, immutable.Iterable[C]), 
NotUsed] =
+          source1
+            .mergeSubstreamsWithParallelism(parallelism)
+            .filter { case (chunk, _) => chunk.size > 0 }
+            .via(atLeastOne)
+
+        source2
           .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, 
initialUploadState))
           .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % 
parallelism })
           .map {
@@ -1379,9 +1390,9 @@ import scala.util.{ Failure, Success, Try }
         import mat.executionContext
         Sink
           .seq[UploadPartResponse]
-          .mapMaterializedValue { responseFuture: 
Future[immutable.Seq[UploadPartResponse]] =>
+          .mapMaterializedValue { (responseFuture: 
Future[immutable.Seq[UploadPartResponse]]) =>
             responseFuture
-              .flatMap { responses: immutable.Seq[UploadPartResponse] =>
+              .flatMap { (responses: immutable.Seq[UploadPartResponse]) =>
                 val successes = responses.collect { case r: 
SuccessfulUploadPart => r }
                 val failures = responses.collect { case r: FailedUploadPart => 
r }
                 if (responses.isEmpty) {
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala
 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala
index 87617f42d..f8270026f 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala
@@ -69,7 +69,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest }
   def canonicalQueryString(query: Query): String = {
     def uriEncode(s: String): String = s.flatMap {
       case c if isUnreservedCharacter(c) => c.toString
-      case c                             => "%" + c.toHexString.toUpperCase
+      case c                             => "%" + 
Integer.toHexString(c).toUpperCase
     }
 
     query
@@ -99,7 +99,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest }
     if (path.isEmpty) "/"
     else {
       path.toString.flatMap {
-        case c if isReservedCharacter(c) => "%" + c.toHexString.toUpperCase
+        case c if isReservedCharacter(c) => "%" + 
Integer.toHexString(c).toUpperCase
         case c                           => c.toString
       }
     }
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
index b7392a073..41078d8f0 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
@@ -29,6 +29,7 @@ import pekko.stream.connectors.s3.headers.{ CannedAcl, 
ServerSideEncryption }
 import pekko.stream.connectors.s3._
 import pekko.stream.connectors.s3.impl._
 import pekko.stream.javadsl.{ RunnableGraph, Sink, Source }
+import pekko.stream.scaladsl.SourceToCompletionStage
 import pekko.util.ccompat.JavaConverters._
 import pekko.util.ByteString
 import pekko.util.OptionConverters._
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
index 85fb79a05..a16a28cff 100644
--- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
+++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.pekko.stream.connectors.s3
 
-import markatta.futiles.Retry
 import org.apache.commons.lang3.StringUtils
 import org.apache.pekko
-import org.apache.pekko.stream.connectors.s3.scaladsl.S3
-import org.apache.pekko.stream.scaladsl.Sink
+import pekko.stream.connectors.s3.impl.retry.Retry
+import pekko.stream.connectors.s3.scaladsl.S3
+import pekko.stream.scaladsl.Sink
 import org.scalacheck.Gen
 import pekko.actor.ActorSystem
 import pekko.stream.Attributes
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala
index 5733b28f9..2871e6537 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala
@@ -121,6 +121,7 @@ class SplitAfterSizeSpec(_system: ActorSystem)
         Seq(ByteString(16), ByteString(17, 18))))
   }
 
-  def bytes(start: Byte, end: Byte): Array[Byte] = (start to 
end).map(_.toByte).toArray[Byte]
+  // https://github.com/lampepfl/dotty/issues/18068
+  def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to 
end).map(_.toByte).toArray[Byte]
 
 }
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala
index 9658056da..f17877cd0 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala
@@ -128,7 +128,7 @@ class CanonicalRequestSpec extends AnyFlatSpec with 
Matchers {
     val reservedCharacters = ":?#[]@!$&'()*+,;="
     reservedCharacters.foreach { char =>
       withClue(s"failed for path containing reserved character [$char]:") {
-        val expectedCharEncoding = "%" + char.toHexString.toUpperCase
+        val expectedCharEncoding = "%" + Integer.toHexString(char).toUpperCase
 
         val request =
           HttpRequest(
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala
index 3f18c84e1..740062365 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala
@@ -79,6 +79,7 @@ class SplitAfterSizeWithContextSpec(_system: ActorSystem)
         Seq((ByteString(17, 18), 2))))
   }
 
-  def bytes(start: Byte, end: Byte): Array[Byte] = (start to 
end).map(_.toByte).toArray[Byte]
+  // https://github.com/lampepfl/dotty/issues/18068
+  def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to 
end).map(_.toByte).toArray[Byte]
 
 }
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala
new file mode 100644
index 000000000..bed649e60
--- /dev/null
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2015 Johan Andrén
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.s3.impl.retry
+
+import java.util.concurrent.{ ThreadLocalRandom, TimeUnit }
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.Random
+
+// copied from 
https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Retry.scala
+object Retry {
+
+  private val alwaysRetry: Throwable => Boolean = _ => true
+
+  /**
+   * Evaluate a block that creates a future up to a specific number of times, 
if the future fails, decide about
+   * retrying using a predicate, if it should retry an exponential back off is 
applied so that the retry waits longer
+   * and longer for every retry it makes. A jitter is also added so that the 
exact timing of the retry isn't exactly
+   * the same for all calls with the same backOffUnit
+   *
+   * Any exception in the block creating the future will also be returned as a 
failed future Default is to retry for
+   * all throwables.
+   *
+   * Based on this wikipedia article: 
http://en.wikipedia.org/wiki/Truncated_binary_exponential_backoff
+   */
+  def retryWithBackOff[A](
+      times: Int,
+      backOffUnit: FiniteDuration,
+      shouldRetry: Throwable => Boolean = alwaysRetry)(fBlock: => 
Future[A])(implicit ec: ExecutionContext): Future[A] =
+    try
+      if (times <= 1) fBlock
+      else retryWithBackOffLoop(times, 1, backOffUnit, shouldRetry)(fBlock)
+    catch {
+      // failure to actually create the future
+      case x: Throwable => Future.failed(x)
+    }
+
+  private def retryWithBackOffLoop[A](
+      totalTimes: Int,
+      timesTried: Int,
+      backOffUnit: FiniteDuration,
+      shouldRetry: Throwable => Boolean)(fBlock: => Future[A])(implicit ec: 
ExecutionContext): Future[A] =
+    if (totalTimes <= timesTried) fBlock
+    else
+      fBlock.recoverWith {
+        case ex: Throwable if shouldRetry(ex) =>
+          val timesTriedNow = timesTried + 1
+          val backOff = nextBackOff(timesTriedNow, backOffUnit)
+          Timeouts
+            .timeout(backOff)(())
+            .flatMap(_ =>
+              retryWithBackOffLoop(
+                totalTimes,
+                timesTriedNow,
+                backOffUnit,
+                shouldRetry)(fBlock))
+      }
+
+  private def nextBackOff(
+      tries: Int,
+      backOffUnit: FiniteDuration): FiniteDuration = {
+    require(tries > 0, "tries should start from 1")
+    val rng = new Random(ThreadLocalRandom.current())
+    // jitter between 0.5 and 1.5
+    val jitter = 0.5 + rng.nextDouble()
+    val factor = math.pow(2, tries) * jitter
+    FiniteDuration(
+      (backOffUnit.toMillis * factor).toLong,
+      TimeUnit.MILLISECONDS)
+  }
+
+}
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala
new file mode 100644
index 000000000..f8be639d4
--- /dev/null
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 Johan Andrén
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.s3.impl.retry
+
+import java.util.{ Timer, TimerTask }
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.util.Try
+
+// copied from 
https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Timeouts.scala
+object Timeouts {
+
+  private val timer = new Timer()
+
+  /**
+   * When ```waitFor``` has passed, evaluate ```what``` on the given execution 
context and complete the future
+   */
+  def timeout[A](waitFor: FiniteDuration)(what: => A)(implicit ec: 
ExecutionContext): Future[A] = {
+    val promise = Promise[A]()
+    timer.schedule(new TimerTask {
+        override def run(): Unit =
+          // make sure we do not block the timer thread
+          Future {
+            promise.complete(Try(what))
+          }
+      },
+      waitFor.toMillis)
+
+    promise.future
+  }
+}
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 9989117c9..e13410191 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -1293,8 +1293,8 @@ class AWSS3IntegrationSpec extends 
TestKit(ActorSystem("AWSS3IntegrationSpec"))
     }.orElse(Some(1.minute))
 
   // Since S3 accounts share global state, we should randomly generate bucket 
names so concurrent tests
-  // against an S3 account don't conflict with eachother
-  override lazy val randomlyGenerateBucketNames: Boolean =
+  // against an S3 account don't conflict with each other
+  override val randomlyGenerateBucketNames: Boolean =
     
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.randomlyGenerateBucketNames")
       .map(_.toBoolean).getOrElse(true)
 }
@@ -1313,7 +1313,7 @@ class MinioS3IntegrationSpec
 
   // Since a unique new Minio container is started with each test run there is 
no point in making random
   // bucket names
-  override lazy val randomlyGenerateBucketNames: Boolean = false
+  override val randomlyGenerateBucketNames: Boolean = false
 
   override lazy val defaultS3Settings: S3Settings = s3Settings
     .withS3RegionProvider(
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
index 522ed3a96..7a6bb91f1 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
@@ -33,7 +33,8 @@ import software.amazon.awssdk.regions.Region
 abstract class S3WireMockBase(_system: ActorSystem, val _wireMockServer: 
WireMockServer) extends TestKit(_system) {
 
   private def this(mock: WireMockServer) =
-    this(ActorSystem(getCallerName(getClass), 
config(mock.port()).withFallback(ConfigFactory.load())), mock)
+    this(ActorSystem(getCallerName(classOf[S3WireMockBase]), 
config(mock.port()).withFallback(ConfigFactory.load())),
+      mock)
 
   def this() = {
     this(initServer())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to