This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 5880466e5 Integrate AWSS3IntegrationSpec with S3 Account
5880466e5 is described below

commit 5880466e51d1df2e8d8a82986324afd866de278f
Author: Matthew de Detrich <[email protected]>
AuthorDate: Wed Apr 19 00:34:15 2023 +0200

    Integrate AWSS3IntegrationSpec with S3 Account
---
 .github/workflows/nightly-builds.yaml              |   36 +
 project/Dependencies.scala                         |   21 +-
 .../pekko/stream/connectors/s3/TestUtils.scala     |   71 ++
 .../stream/connectors/s3/scaladsl/Generators.scala |  114 +++
 .../connectors/s3/scaladsl/S3IntegrationSpec.scala | 1005 ++++++++++++--------
 5 files changed, 838 insertions(+), 409 deletions(-)

diff --git a/.github/workflows/nightly-builds.yaml 
b/.github/workflows/nightly-builds.yaml
new file mode 100644
index 000000000..d687e3f47
--- /dev/null
+++ b/.github/workflows/nightly-builds.yaml
@@ -0,0 +1,36 @@
+name: Nightly Builds
+
+on:
+  schedule:
+    - cron: "0 0 * * *"
+  workflow_dispatch:
+
+permissions: {}
+
+jobs:
+  integration-tests:
+    name: Pekko Connectors Integration tests
+    runs-on: ubuntu-20.04
+    if: github.repository == 'apache/incubator-pekko-connectors'
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+        with:
+          fetch-depth: 0
+
+      - name: Setup Java 8
+        uses: actions/setup-java@v3
+        with:
+          distribution: temurin
+          java-version: 8
+
+      - name: Cache Coursier cache
+        uses: coursier/[email protected]
+
+      - name: S3 Integration tests
+        run:  |-
+          sbt \
+          -Dpekko.connectors.s3.aws.credentials.provider=static \
+          -Dpekko.connectors.s3.aws.credentials.access-key-id=${{ 
secrets.AWS_ACCESS_KEY }} \
+          -Dpekko.connectors.s3.aws.credentials.secret-access-key=${{ 
secrets.AWS_SECRET_KEY }} \
+          + "s3/Test/runMain org.scalatest.tools.Runner -o -s 
org.apache.pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec"
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 483d587f6..f262215c0 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -33,6 +33,19 @@ object Dependencies {
   val TestContainersScalaTestVersion = "0.40.3"
   val mockitoVersion = "4.2.0" // check even 
https://github.com/scalatest/scalatestplus-mockito/releases
   val hoverflyVersion = "0.14.1"
+  val scalaCheckVersion = "1.15.4"
+
+  /**
+   * Calculates the scalatest version in a format that is used for 
`org.scalatestplus` scalacheck artifacts
+   *
+   * @see
+   * https://www.scalatest.org/user_guide/property_based_testing
+   */
+  private def scalaTestPlusScalaCheckVersion(version: String) =
+    version.split('.').take(2).mkString("-")
+
+  val scalaTestScalaCheckArtifact = 
s"scalacheck-${scalaTestPlusScalaCheckVersion(scalaCheckVersion)}"
+  val scalaTestScalaCheckVersion = s"$ScalaTestVersion.0"
 
   val CouchbaseVersion = "2.7.16"
   val CouchbaseVersionForDocs = "2.7"
@@ -155,7 +168,7 @@ object Dependencies {
       ("org.apache.hadoop" % "hadoop-client" % "3.2.1" % 
Test).exclude("log4j", "log4j"), // Apache2
       ("org.apache.hadoop" % "hadoop-common" % "3.2.1" % 
Test).exclude("log4j", "log4j"), // Apache2
       "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.9" % Test,
-      "org.scalacheck" %% "scalacheck" % "1.15.4" % Test,
+      "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
       "org.specs2" %% "specs2-core" % "4.8.3" % Test, // MIT like: 
https://github.com/etorreborre/specs2/blob/master/LICENSE.txt
       "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test // MIT 
like: http://www.slf4j.org/license.html
     ))
@@ -375,8 +388,10 @@ object Dependencies {
       "software.amazon.awssdk" % "auth" % AwsSdk2Version,
       // in-memory filesystem for file related tests
       "com.google.jimfs" % "jimfs" % "1.2" % Test, // ApacheV2
-      "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test // ApacheV2
-    ))
+      "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test, // ApacheV2
+      "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
+      "org.scalatestplus" %% scalaTestScalaCheckArtifact % 
scalaTestScalaCheckVersion % Test,
+      "com.markatta" %% "futiles" % "2.0.2" % Test))
 
   val SpringWeb = {
     val SpringVersion = "5.1.17.RELEASE"
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
new file mode 100644
index 000000000..cec159ac3
--- /dev/null
+++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+package org.apache.pekko.stream.connectors.s3
+
+import markatta.futiles.Retry
+import org.apache.commons.lang3.StringUtils
+import org.apache.pekko
+import org.apache.pekko.stream.connectors.s3.scaladsl.S3
+import org.apache.pekko.stream.scaladsl.Sink
+import org.scalacheck.Gen
+import pekko.actor.ActorSystem
+import pekko.stream.Attributes
+
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+object TestUtils {
+  def checkEnvVarAvailable(env: String): Boolean =
+    try
+      sys.env.get(env) match {
+        case Some(value) if StringUtils.isBlank(value) => false
+        case Some(_)                                   => true
+        case None                                      => false
+      }
+    catch {
+      case _: NoSuchElementException => false
+    }
+
+  def cleanAndDeleteBucket(bucket: String, s3Attrs: Attributes)(implicit 
system: ActorSystem): Future[Unit] = {
+    implicit val ec: ExecutionContext = system.dispatcher
+    for {
+      bucketVersioningResult <- S3.getBucketVersioning(bucket)(implicitly, 
s3Attrs)
+      deleteAllVersions = 
bucketVersioningResult.status.contains(BucketVersioningStatus.Enabled)
+      _ <- S3.deleteBucketContents(bucket, deleteAllVersions = 
deleteAllVersions).withAttributes(s3Attrs).runWith(
+        Sink.ignore)
+      multiParts <-
+        S3.listMultipartUpload(bucket, 
None).withAttributes(s3Attrs).runWith(Sink.seq)
+      _ <- Future.sequence(multiParts.map { part =>
+        S3.deleteUpload(bucket, part.key, part.uploadId)(implicitly, s3Attrs)
+      })
+      _ <- Retry.retryWithBackOff(
+        5,
+        100.millis,
+        throwable => throwable.getMessage.contains("The bucket you tried to 
delete is not empty"))(
+        S3.deleteBucket(bucket)(implicitly, s3Attrs))
+      _ = system.log.info(s"Completed deleting bucket $bucket")
+    } yield ()
+  }
+
+  /**
+   * Will return a single value from a given generator. WARNING this will 
block the thread
+   * until a value is retrieved so only use this for generators that have a 
high chance of
+   * returning a value. If a [[Gen.filter]] ends up filtering out too many 
values this can
+   * cause the thread to be stuck for a long time
+   */
+  @tailrec
+  def loopUntilGenRetrievesValue[T](gen: Gen[T]): T =
+    gen.sample match {
+      case Some(value) => value
+      case None        => loopUntilGenRetrievesValue(gen)
+    }
+
+}
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
new file mode 100644
index 000000000..93cd5f390
--- /dev/null
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+package org.apache.pekko.stream.connectors.s3.scaladsl
+
+import org.scalacheck.Gen
+
+import scala.annotation.nowarn
+import scala.language.postfixOps
+object Generators {
+  val MaxBucketLength: Int = 63
+
+  // See 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html 
for valid
+  // bucketnames
+
+  lazy val bucketLetterOrNumberCharGen: Gen[Char] = Gen.frequency(
+    (1, Gen.numChar),
+    (1, Gen.alphaLowerChar))
+
+  def bucketAllCharGen(useVirtualDotHost: Boolean): Gen[Char] = {
+    val base = List(
+      (10, Gen.alphaLowerChar),
+      (1, Gen.const('-')),
+      (1, Gen.numChar))
+
+    val frequency = if (useVirtualDotHost) (1, Gen.const('.')) +: base else 
base
+
+    Gen.frequency(frequency: _*)
+  }
+
+  @nowarn("msg=not.*?exhaustive")
+  private def checkInvalidDuplicateChars(chars: List[Char]): Boolean =
+    chars.sliding(2).forall { case Seq(before, after) =>
+      !(before == '.' && after == '.' || before == '-' && after == '.' || 
before == '.' && after == '-')
+    }
+
+  private def checkAlphaChar(c: Char): Boolean =
+    c >= 'a' && c <= 'z'
+
+  private def allCharCheck(useVirtualDotHost: Boolean, string: String): 
Boolean =
+    if (useVirtualDotHost) {
+      string.forall(char => Character.isDigit(char) || checkAlphaChar(char) || 
char == '-' || char == '.') &&
+      checkInvalidDuplicateChars(string.toList)
+    } else
+      string.forall(char => Character.isDigit(char) || checkAlphaChar(char) || 
char == '-')
+
+  def validatePrefix(useVirtualDotHost: Boolean, prefix: Option[String]): 
Option[String] = {
+    val withoutWhitespace = prefix match {
+      case Some(value) if value.trim == "" => None
+      case Some(value)                     => Some(value)
+      case None                            => None
+    }
+
+    withoutWhitespace match {
+      case Some(value) if !(Character.isDigit(value.head) || 
checkAlphaChar(value.head)) =>
+        throw new IllegalArgumentException(
+          s"Invalid starting digit for prefix $value, ${value.head} needs to 
be an alpha char or digit")
+      case Some(value) if value.length > 1 =>
+        if (!allCharCheck(useVirtualDotHost, value.drop(1)))
+          throw new IllegalArgumentException(
+            s"Prefix $value contains invalid characters")
+      case Some(value) if value.length > MaxBucketLength - 1 =>
+        throw new IllegalArgumentException(
+          s"Prefix is too long, it has size ${value.length} where as the max 
bucket size is $MaxBucketLength")
+      case _ => ()
+    }
+
+    withoutWhitespace
+  }
+
+  def bucketNameGen(useVirtualDotHost: Boolean, prefix: Option[String] = 
None): Gen[String] = {
+    val finalPrefix = validatePrefix(useVirtualDotHost, prefix)
+
+    for {
+      range <- {
+        val maxLength = finalPrefix match {
+          case Some(p) => MaxBucketLength - p.length
+          case None    => MaxBucketLength
+        }
+
+        if (maxLength > 3)
+          Gen.choose(3, maxLength)
+        else
+          Gen.const(maxLength)
+      }
+      startString = finalPrefix.getOrElse("")
+
+      bucketName <- range match {
+        case 3 =>
+          for {
+            first <- bucketLetterOrNumberCharGen
+            second <- bucketAllCharGen(useVirtualDotHost)
+            third <- bucketLetterOrNumberCharGen
+          } yield startString ++ List(first, second, third).mkString
+        case _ =>
+          for {
+            first <- bucketLetterOrNumberCharGen
+            last <- bucketLetterOrNumberCharGen
+            middle <- {
+              val gen = Gen.listOfN(range - 2, 
bucketAllCharGen(useVirtualDotHost))
+              if (useVirtualDotHost) gen.filter(checkInvalidDuplicateChars) 
else gen
+            }
+          } yield startString ++ first.toString ++ middle.mkString ++ 
last.toString
+      }
+    } yield bucketName
+  }
+
+}
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 08025149c..30932cf3b 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.stream.connectors.s3.scaladsl
 
 import org.apache.pekko
+import org.scalacheck.Gen
 import pekko.actor.ActorSystem
 import pekko.http.scaladsl.Http
 import pekko.http.scaladsl.model.{ ContentTypes, StatusCodes }
@@ -27,16 +28,17 @@ import pekko.testkit.{ TestKit, TestKitBase }
 import pekko.util.ccompat.JavaConverters._
 import pekko.util.ByteString
 import pekko.{ Done, NotUsed }
-import org.scalatest.Inspectors.forEvery
 import org.scalatest._
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.matchers.should.Matchers
+import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
 import software.amazon.awssdk.auth.credentials._
 import software.amazon.awssdk.regions.Region
 import software.amazon.awssdk.regions.providers._
 
 import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
 import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -48,6 +50,7 @@ trait S3IntegrationSpec
     with BeforeAndAfterAll
     with Matchers
     with ScalaFutures
+    with ScalaCheckPropertyChecks
     with OptionValues
     with LogCapturing {
 
@@ -55,23 +58,172 @@ trait S3IntegrationSpec
 
   implicit val defaultPatience: PatienceConfig = PatienceConfig(3.minutes, 
100.millis)
 
-  val defaultBucket = "my-test-us-east-1"
-  val nonExistingBucket = "nowhere"
+  /**
+   * A prefix that will get added to each generated bucket in the test, this 
is to track the buckets that are
+   * specifically created by the test
+   */
+  lazy val bucketPrefix: Option[String] = None
+
+  /**
+   * Whether to randomly generate bucket names
+   */
+  val randomlyGenerateBucketNames: Boolean
+
+  implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
+    PropertyCheckConfiguration(minSuccessful = 1)
+
+  /**
+   * Whether to enable cleanup of buckets after tests are run and if so the 
initial delay to wait after the test
+   */
+  lazy val enableCleanup: Option[FiniteDuration] = None
+
+  /**
+   * The MaxTimeout when cleaning up all of the buckets during `afterAll`
+   */
+  lazy val maxCleanupTimeout: FiniteDuration = 10.minutes
+
+  /**
+   * @param constantBucketName The bucket name constant to use if we are not 
randomly generating bucket names
+   */
+  def genBucketName(constantBucketName: String): Gen[String] =
+    if (randomlyGenerateBucketNames)
+      Generators.bucketNameGen(useVirtualDotHost = false, bucketPrefix)
+    else
+      Gen.const(bucketPrefix.getOrElse("") ++ constantBucketName)
+
+  def createBucket(bucket: String, versioning: Boolean, bucketReference: 
AtomicReference[String], s3Attrs: Attributes)
+      : Future[Unit] =
+    for {
+      bucketResponse <- S3.checkIfBucketExists(bucket)(implicitly, s3Attrs)
+      _ <- bucketResponse match {
+        case BucketAccess.AccessDenied =>
+          throw new RuntimeException(
+            s"Unable to create bucket: $bucket since it already exists however 
permissions are inadequate")
+        case BucketAccess.AccessGranted | BucketAccess.NotExists =>
+          for {
+            _ <- bucketResponse match {
+              case BucketAccess.AccessGranted =>
+                system.log.info(
+                  s"Deleting and recreating bucket: $bucket since it already 
exists with correct permissions")
+                TestUtils.cleanAndDeleteBucket(bucket, s3Attrs)
+              case _ =>
+                Future.successful(())
+            }
+            _ <- S3.makeBucket(bucket)(implicitly, s3Attrs)
+            // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
+            _ <- if (versioning && this.isInstanceOf[AWSS3IntegrationSpec])
+              S3.putBucketVersioning(bucket, 
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))(implicitly,
+                s3Attrs)
+            else
+              Future.successful(())
+            _ = bucketReference.set(bucket)
+          } yield ()
+      }
+    } yield ()
+
+  private val defaultBucketReference = new AtomicReference[String]()
+  def withDefaultBucket(testCode: String => Assertion): Assertion =
+    testCode(defaultBucketReference.get())
 
   // with dots forcing path style access
-  val bucketWithDots = "my.test.frankfurt"
+  private val bucketWithDotsReference = new AtomicReference[String]()
+  def withBucketWithDots(testCode: String => Assertion): Assertion =
+    testCode(bucketWithDotsReference.get())
+
+  private val nonExistingBucketReference = new AtomicReference[String]()
+  def withNonExistingBucket(testCode: String => Assertion): Assertion =
+    testCode(nonExistingBucketReference.get())
+
+  private val bucketWithVersioningReference = new AtomicReference[String]()
+  def withBucketWithVersioning(testCode: String => Assertion): Assertion =
+    testCode(bucketWithVersioningReference.get())
 
-  val bucketWithVersioning = "my-bucket-with-versioning"
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val (bucketWithDots, defaultBucket, bucketWithVersioning, 
nonExistingBucket) = {
+      val baseGen = for {
+        bucketWithDots <- genBucketName(S3IntegrationSpec.BucketWithDots)
+        defaultBucket <- genBucketName(S3IntegrationSpec.DefaultBucket)
+        bucketWithVersioning <- 
genBucketName(S3IntegrationSpec.BucketWithVersioning)
+        nonExistingBucket <- genBucketName(S3IntegrationSpec.NonExistingBucket)
+      } yield (bucketWithDots, defaultBucket, bucketWithVersioning, 
nonExistingBucket)
+
+      if (randomlyGenerateBucketNames)
+        TestUtils.loopUntilGenRetrievesValue(baseGen.filter {
+          case (bucketWithDots, defaultBucket, bucketWithVersioning, 
nonExistingBucket) =>
+            // Make sure that we don't somehow generate 2 or more buckets with 
the same name
+            List(bucketWithDots, defaultBucket, bucketWithVersioning, 
nonExistingBucket).distinct.size == 4
+        })
+      else
+        baseGen.sample.get
+    }
+
+    val makeBuckets = for {
+      _ <- {
+        createBucket(bucketWithDots, versioning = false, 
bucketWithDotsReference,
+          attributes(
+            _.withS3RegionProvider(
+              new AwsRegionProvider {
+                val getRegion: Region = Region.EU_CENTRAL_1
+              })))
+      }
+      _ <- createBucket(defaultBucket, versioning = false, 
defaultBucketReference, attributes)
+      _ <- createBucket(bucketWithVersioning, versioning = true, 
bucketWithVersioningReference, attributes)
+      _ = nonExistingBucketReference.set(nonExistingBucket)
+    } yield ()
+
+    system.log.info(
+      s"Corresponding S3 bucket names are bucketWithDots: $bucketWithDots, 
defaultBucket: $defaultBucket, bucketWithVersioning: $bucketWithVersioning, 
nonExistingBucket: $nonExistingBucket")
+
+    Await.result(makeBuckets, 10.seconds)
+  }
 
   val objectKey = "test"
 
   val objectValue = "Some String"
   val metaHeaders: Map[String, String] = Map("location" -> "Africa", 
"datatype" -> "image")
 
-  override protected def afterAll(): Unit =
+  private def cleanBucket(bucket: String, attributes: Attributes): 
Future[Unit] = (for {
+    check <- S3.checkIfBucketExists(bucket)(implicitly, attributes)
+    _ <- check match {
+      case BucketAccess.AccessDenied =>
+        Future {
+          system.log.warning(
+            s"Cannot delete bucket: $bucket due to having access denied. 
Please look into this as it can fill up your AWS account")
+        }
+      case BucketAccess.AccessGranted =>
+        system.log.info(s"Cleaning up bucket: $bucket")
+        TestUtils.cleanAndDeleteBucket(bucket, attributes)
+      case BucketAccess.NotExists =>
+        Future {
+          system.log.info(s"Not deleting bucket: $bucket since it no longer 
exists")
+        }
+    }
+  } yield ()).recover { case util.control.NonFatal(error) =>
+    system.log.error(s"Error deleting bucket: $bucket", error)
+  }
+
+  override protected def afterAll(): Unit = {
+    super.afterAll()
+    enableCleanup.foreach { timeout =>
+      val bucketsWithAttributes = List(
+        Option(defaultBucketReference.get()).map(bucket => (bucket, 
attributes)),
+        Option(bucketWithDotsReference.get()).map(bucket =>
+          (bucket,
+            attributes(
+              _.withS3RegionProvider(
+                new AwsRegionProvider {
+                  val getRegion: Region = Region.EU_CENTRAL_1
+                })))),
+        Option(bucketWithVersioningReference.get()).map(bucket => (bucket, 
attributes))).flatten
+
+      Await.result(Future.traverse(bucketsWithAttributes) { case (bucket, 
attrs) => cleanBucket(bucket, attrs) },
+        timeout)
+    }
     Http(system)
       .shutdownAllConnectionPools()
       .foreach(_ => TestKit.shutdownActorSystem(system))
+  }
 
   lazy val defaultS3Settings: S3Settings =
     S3Settings()
@@ -103,7 +255,7 @@ trait S3IntegrationSpec
   def defaultRegionContentCount = 0
   def otherRegionContentCount = 0
 
-  it should "list buckets in current aws account" in {
+  it should "list buckets in current aws account" in withDefaultBucket { 
defaultBucket =>
     val result = for {
       buckets <- S3.listBuckets().withAttributes(attributes).runWith(Sink.seq)
     } yield buckets
@@ -113,9 +265,8 @@ trait S3IntegrationSpec
     buckets.map(_.name) should contain(defaultBucket)
   }
 
-  it should "list buckets in current AWS account using non US_EAST_1 region" 
in {
+  it should "list buckets in current AWS account using non US_EAST_1 region" 
in withDefaultBucket { defaultBucket =>
     // Its only AWS that complains if listBuckets is called from a non 
US_EAST_1 region
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
     val result = for {
       buckets <- S3.listBuckets().withAttributes(
         S3Attributes.settings(defaultS3Settings.withS3RegionProvider(new 
AwsRegionProvider {
@@ -128,7 +279,7 @@ trait S3IntegrationSpec
     buckets.map(_.name) should contain(defaultBucket)
   }
 
-  it should "list with real credentials" in {
+  it should "list with real credentials" in withDefaultBucket { defaultBucket 
=>
     val result = S3
       .listBucket(defaultBucket, None)
       .withAttributes(attributes)
@@ -138,7 +289,7 @@ trait S3IntegrationSpec
     listingResult.size shouldBe defaultRegionContentCount
   }
 
-  it should "list with real credentials using the Version 1 API" in {
+  it should "list with real credentials using the Version 1 API" in 
withDefaultBucket { defaultBucket =>
     val result = S3
       .listBucket(defaultBucket, None)
       
.withAttributes(attributes(_.withListBucketApiVersion(ApiVersion.ListBucketVersion1)))
@@ -148,7 +299,7 @@ trait S3IntegrationSpec
     listingResult.size shouldBe defaultRegionContentCount
   }
 
-  it should "list with real credentials in non us-east-1 zone" in {
+  it should "list with real credentials in non us-east-1 zone" in 
withBucketWithDots { bucketWithDots =>
     val result = S3
       .listBucket(bucketWithDots, None)
       .withAttributes(otherRegionSettingsPathStyleAccess)
@@ -157,7 +308,7 @@ trait S3IntegrationSpec
     result.futureValue.size shouldBe otherRegionContentCount
   }
 
-  it should "upload with real credentials" in {
+  it should "upload with real credentials" in withDefaultBucket { 
defaultBucket =>
     val objectKey = "putTest"
     val bytes = ByteString(objectValue)
     val data = Source.single(ByteString(objectValue))
@@ -174,7 +325,7 @@ trait S3IntegrationSpec
     result.futureValue.eTag should not be empty
   }
 
-  it should "upload and delete" in {
+  it should "upload and delete" in withDefaultBucket { defaultBucket =>
     val objectKey = "putTest"
     val bytes = ByteString(objectValue)
     val data = Source.single(ByteString(objectValue))
@@ -202,7 +353,7 @@ trait S3IntegrationSpec
     metaAfter shouldBe empty
   }
 
-  it should "upload multipart with real credentials" in {
+  it should "upload multipart with real credentials" in withDefaultBucket { 
defaultBucket =>
     val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
 
     val result =
@@ -216,7 +367,7 @@ trait S3IntegrationSpec
     multipartUploadResult.key shouldBe objectKey
   }
 
-  it should "download with real credentials" in {
+  it should "download with real credentials" in withDefaultBucket { 
defaultBucket =>
     val (metaFuture, bodyFuture) = S3
       .getObject(defaultBucket, objectKey)
       .withAttributes(attributes)
@@ -230,7 +381,7 @@ trait S3IntegrationSpec
     meta.contentType shouldBe 
Some(ContentTypes.`application/octet-stream`.value)
   }
 
-  it should "delete with real credentials" in {
+  it should "delete with real credentials" in withDefaultBucket { 
defaultBucket =>
     val delete = S3
       .deleteObject(defaultBucket, objectKey)
       .withAttributes(attributes)
@@ -238,7 +389,7 @@ trait S3IntegrationSpec
     delete.futureValue shouldEqual pekko.Done
   }
 
-  it should "upload huge multipart with real credentials" in {
+  it should "upload huge multipart with real credentials" in withDefaultBucket 
{ defaultBucket =>
     val objectKey = "huge"
     val hugeString = "0123456789abcdef" * 64 * 1024 * 11
     val result =
@@ -253,7 +404,7 @@ trait S3IntegrationSpec
     multipartUploadResult.key shouldBe objectKey
   }
 
-  it should "upload, download and delete with spaces in the key" in {
+  it should "upload, download and delete with spaces in the key" in 
withDefaultBucket { defaultBucket =>
     val objectKey = "test folder/test file.txt"
     val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
 
@@ -281,7 +432,7 @@ trait S3IntegrationSpec
       .futureValue shouldEqual pekko.Done
   }
 
-  it should "upload, download and delete with brackets in the key" in {
+  it should "upload, download and delete with brackets in the key" in 
withDefaultBucket { defaultBucket =>
     val objectKey = "abc/DEF/2017/06/15/1234 (1).TXT"
     val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
 
@@ -309,144 +460,161 @@ trait S3IntegrationSpec
       .futureValue shouldEqual pekko.Done
   }
 
-  it should "upload, download and delete with spaces in the key in non 
us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "test folder/test file.txt")
+  it should "upload, download and delete with spaces in the key in non 
us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "test folder/test file.txt")
+  }
 
   // we want ASCII and other UTF-8 characters!
-  it should "upload, download and delete with special characters in the key in 
non us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "føldęrü/1234()[]><!? .TXT")
+  it should "upload, download and delete with special characters in the key in 
non us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "føldęrü/1234()[]><!? .TXT")
+  }
 
-  it should "upload, download and delete with `+` character in the key in non 
us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
-    "1 + 2 = 3")
+  it should "upload, download and delete with `+` character in the key in non 
us-east-1 zone" in withBucketWithDots {
+    bucketWithDots =>
+      uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+        "1 + 2 = 3")
+  }
 
-  it should "upload, copy, download the copy, and delete" in 
uploadCopyDownload(
-    "original/file.txt",
-    "copy/file.txt")
+  it should "upload, copy, download the copy, and delete" in withDefaultBucket 
{ defaultBucket =>
+    uploadCopyDownload(defaultBucket,
+      "original/file.txt",
+      "copy/file.txt")
+  }
 
   // NOTE: MinIO currently has problems copying files with spaces.
-  it should "upload, copy, download the copy, and delete with special 
characters in key" in uploadCopyDownload(
-    "original/føldęrü/1234()[]><!?.TXT",
-    "copy/1 + 2 = 3")
-
-  it should "upload 2 files with common prefix, 1 with different prefix and 
delete by prefix" in {
-    val sourceKey1 = "original/file1.txt"
-    val sourceKey2 = "original/file2.txt"
-    val sourceKey3 = "uploaded/file3.txt"
-    val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
-
-    val results = for {
-      upload1 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey1).withAttributes(attributes))
-      upload2 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey2).withAttributes(attributes))
-      upload3 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey3).withAttributes(attributes))
-    } yield (upload1, upload2, upload3)
-
-    whenReady(results) {
-      case (upload1, upload2, upload3) =>
-        upload1.bucket shouldEqual defaultBucket
-        upload1.key shouldEqual sourceKey1
-        upload2.bucket shouldEqual defaultBucket
-        upload2.key shouldEqual sourceKey2
-        upload3.bucket shouldEqual defaultBucket
-        upload3.key shouldEqual sourceKey3
+  it should "upload, copy, download the copy, and delete with special 
characters in key" in withDefaultBucket {
+    defaultBucket =>
+      uploadCopyDownload(defaultBucket,
+        "original/føldęrü/1234()[]><!?.TXT",
+        "copy/1 + 2 = 3")
+  }
 
-        S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-          .futureValue shouldEqual pekko.Done
-        val numOfKeysForPrefix =
-          S3.listBucket(defaultBucket, Some("original"))
+  it should "upload 2 files with common prefix, 1 with different prefix and 
delete by prefix" in withDefaultBucket {
+    defaultBucket =>
+      val sourceKey1 = "original/file1.txt"
+      val sourceKey2 = "original/file2.txt"
+      val sourceKey3 = "uploaded/file3.txt"
+      val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
+
+      val results = for {
+        upload1 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey1).withAttributes(attributes))
+        upload2 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey2).withAttributes(attributes))
+        upload3 <- source.runWith(S3.multipartUpload(defaultBucket, 
sourceKey3).withAttributes(attributes))
+      } yield (upload1, upload2, upload3)
+
+      whenReady(results) {
+        case (upload1, upload2, upload3) =>
+          upload1.bucket shouldEqual defaultBucket
+          upload1.key shouldEqual sourceKey1
+          upload2.bucket shouldEqual defaultBucket
+          upload2.key shouldEqual sourceKey2
+          upload3.bucket shouldEqual defaultBucket
+          upload3.key shouldEqual sourceKey3
+
+          S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
             .withAttributes(attributes)
-            .runFold(0)((result, _) => result + 1)
-            .futureValue
-        numOfKeysForPrefix shouldEqual 0
-        S3.deleteObject(defaultBucket, sourceKey3)
-          .withAttributes(attributes)
-          .runWith(Sink.head)
-          .futureValue shouldEqual pekko.Done
-    }
+            .runWith(Sink.ignore)
+            .futureValue shouldEqual pekko.Done
+          val numOfKeysForPrefix =
+            S3.listBucket(defaultBucket, Some("original"))
+              .withAttributes(attributes)
+              .runFold(0)((result, _) => result + 1)
+              .futureValue
+          numOfKeysForPrefix shouldEqual 0
+          S3.deleteObject(defaultBucket, sourceKey3)
+            .withAttributes(attributes)
+            .runWith(Sink.head)
+            .futureValue shouldEqual pekko.Done
+      }
   }
 
-  it should "create multiple versions of an object and successfully clean it 
with deleteBucketContents" in {
-    // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val versionKey = "test-version"
-    val one = ByteString("one")
-    val two = ByteString("two")
-    val three = ByteString("three")
-
-    val results =
-      for {
-        // Clean the bucket just incase there is residual data in there
-        _ <- S3
-          .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(one), 
one.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(two), 
two.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        _ <- S3
-          .putObject(bucketWithVersioning, versionKey, Source.single(three), 
three.length, s3Headers = S3Headers())
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        versionsBeforeDelete <- S3
-          .listObjectVersions(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
-        _ <- S3
-          .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
-          .withAttributes(attributes)
-          .runWith(Sink.ignore)
-        versionsAfterDelete <- S3
-          .listObjectVersions(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
-        listBucketContentsAfterDelete <- S3
-          .listBucket(bucketWithVersioning, None)
-          .withAttributes(attributes)
-          .runWith(Sink.seq)
+  it should "create multiple versions of an object and successfully clean it 
with deleteBucketContents" in withBucketWithVersioning {
+    bucketWithVersioning =>
+      // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val versionKey = "test-version"
+      val one = ByteString("one")
+      val two = ByteString("two")
+      val three = ByteString("three")
+
+      val results =
+        for {
+          // Clean the bucket just incase there is residual data in there
+          _ <- S3
+            .deleteBucketContents(bucketWithVersioning, deleteAllVersions = 
true)
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(one), 
one.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(two), 
two.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          _ <- S3
+            .putObject(bucketWithVersioning, versionKey, Source.single(three), 
three.length, s3Headers = S3Headers())
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          versionsBeforeDelete <- S3
+            .listObjectVersions(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
+          _ <- S3
+            .deleteBucketContents(bucketWithVersioning, deleteAllVersions = 
true)
+            .withAttributes(attributes)
+            .runWith(Sink.ignore)
+          versionsAfterDelete <- S3
+            .listObjectVersions(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
+          listBucketContentsAfterDelete <- S3
+            .listBucket(bucketWithVersioning, None)
+            .withAttributes(attributes)
+            .runWith(Sink.seq)
 
-      } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions },
-        versionsAfterDelete.flatMap {
-          case (versions, _) => versions
-        }, listBucketContentsAfterDelete)
+        } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions 
},
+          versionsAfterDelete.flatMap {
+            case (versions, _) => versions
+          }, listBucketContentsAfterDelete)
 
-    val (versionsBeforeDelete, versionsAfterDelete, bucketContentsAfterDelete) 
= results.futureValue
+      val (versionsBeforeDelete, versionsAfterDelete, 
bucketContentsAfterDelete) = results.futureValue
 
-    versionsBeforeDelete.size shouldEqual 3
-    versionsAfterDelete.size shouldEqual 0
-    bucketContentsAfterDelete.size shouldEqual 0
+      versionsBeforeDelete.size shouldEqual 3
+      versionsAfterDelete.size shouldEqual 0
+      bucketContentsAfterDelete.size shouldEqual 0
   }
 
-  it should "listing object versions for a non versioned bucket should return 
None for versionId" in {
-    // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val objectKey = "listObjectVersionIdTest"
-    val bytes = ByteString(objectValue)
-    val data = Source.single(ByteString(objectValue))
-    val results = for {
-      _ <- S3
-        .putObject(defaultBucket,
-          objectKey,
-          data,
-          bytes.length,
-          s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
-        .withAttributes(attributes)
-        .runWith(Sink.ignore)
-      result <- S3.listObjectVersions(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
-      _ <- S3.deleteObject(defaultBucket, 
objectKey).withAttributes(attributes).runWith(Sink.ignore)
-    } yield result.flatMap { case (versions, _) => versions }
+  it should "listing object versions for a non versioned bucket should return 
None for versionId" in withDefaultBucket {
+    defaultBucket =>
+      // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val objectKey = "listObjectVersionIdTest"
+      val bytes = ByteString(objectValue)
+      val data = Source.single(ByteString(objectValue))
+      val results = for {
+        _ <- S3
+          .putObject(defaultBucket,
+            objectKey,
+            data,
+            bytes.length,
+            s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
+          .withAttributes(attributes)
+          .runWith(Sink.ignore)
+        result <- S3.listObjectVersions(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
+        _ <- S3.deleteObject(defaultBucket, 
objectKey).withAttributes(attributes).runWith(Sink.ignore)
+      } yield result.flatMap { case (versions, _) => versions }
 
-    forEvery(results.futureValue) { version =>
-      version.versionId shouldEqual None
-    }
+      Inspectors.forEvery(results.futureValue) { version =>
+        version.versionId shouldEqual None
+      }
   }
 
-  it should "upload 2 files, delete all files in bucket" in {
+  it should "upload 2 files, delete all files in bucket" in withDefaultBucket 
{ defaultBucket =>
     val sourceKey1 = "original/file1.txt"
     val sourceKey2 = "original/file2.txt"
     val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
@@ -550,154 +718,157 @@ trait S3IntegrationSpec
     String.format("%032X", new BigInteger(1, digest.digest())).toLowerCase
   }
 
-  it should "upload 1 file slowly, cancel it and retrieve a multipart upload + 
list part" in {
-    // This test doesn't work on Minio since minio doesn't properly implement 
this API, see
-    // https://github.com/minio/minio/issues/13246
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val sourceKey = "original/file-slow.txt"
-    val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload")
+  it should "upload 1 file slowly, cancel it and retrieve a multipart upload + 
list part" in withDefaultBucket {
+    defaultBucket =>
+      // This test doesn't work on Minio since minio doesn't properly 
implement this API, see
+      // https://github.com/minio/minio/issues/13246
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val sourceKey = "original/file-slow.txt"
+      val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload")
 
-    val inputData = createStringCollectionWithMinChunkSize(5)
-    val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+      val inputData = createStringCollectionWithMinChunkSize(5)
+      val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
 
-    val multiPartUpload =
-      slowSource.toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(Keep.right).run()
+      val multiPartUpload =
+        slowSource.toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(Keep.right).run()
 
-    val results = for {
-      _ <- pekko.pattern.after(25.seconds)(Future {
-        sharedKillSwitch.abort(AbortException)
-      })
-      _ <- multiPartUpload.recover {
-        case AbortException => ()
-      }
-      incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
-      uploadIds = incomplete.collect {
-        case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+      val results = for {
+        _ <- pekko.pattern.after(25.seconds)(Future {
+          sharedKillSwitch.abort(AbortException)
+        })
+        _ <- multiPartUpload.recover {
+          case AbortException => ()
+        }
+        incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
+        uploadIds = incomplete.collect {
+          case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+        }
+        parts <- Future.sequence(uploadIds.map { uploadId =>
+          S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
+        })
+        // Cleanup the uploads after
+        _ <- Future.sequence(uploadIds.map { uploadId =>
+          S3.deleteUpload(defaultBucket, sourceKey, uploadId)(implicitly, 
attributes)
+        })
+      } yield (uploadIds, incomplete, parts.flatten)
+
+      whenReady(results) {
+        case (uploadIds, incompleteFiles, parts) =>
+          val inputsUntilAbort = inputData.slice(0, 3)
+          incompleteFiles.exists(_.key == sourceKey) shouldBe true
+          parts.nonEmpty shouldBe true
+          uploadIds.size shouldBe 1
+          parts.size shouldBe 3
+          parts.map(_.size) shouldBe 
inputsUntilAbort.map(_.utf8String.getBytes("UTF-8").length)
+          // In S3 the etag's are actually an MD5 hash of the contents of the 
part so we can use this to check
+          // that the data has been uploaded correctly and in the right order, 
see
+          // 
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
+          parts.map(_.eTag.replaceAll("\"", "")) shouldBe 
inputsUntilAbort.map(byteStringToMD5)
       }
-      parts <- Future.sequence(uploadIds.map { uploadId =>
-        S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
-      })
-      // Cleanup the uploads after
-      _ <- Future.sequence(uploadIds.map { uploadId =>
-        S3.deleteUpload(defaultBucket, sourceKey, uploadId)(implicitly, 
attributes)
-      })
-    } yield (uploadIds, incomplete, parts.flatten)
-
-    whenReady(results) {
-      case (uploadIds, incompleteFiles, parts) =>
-        val inputsUntilAbort = inputData.slice(0, 3)
-        incompleteFiles.exists(_.key == sourceKey) shouldBe true
-        parts.nonEmpty shouldBe true
-        uploadIds.size shouldBe 1
-        parts.size shouldBe 3
-        parts.map(_.size) shouldBe 
inputsUntilAbort.map(_.utf8String.getBytes("UTF-8").length)
-        // In S3 the etag's are actually an MD5 hash of the contents of the 
part so we can use this to check
-        // that the data has been uploaded correctly and in the right order, 
see
-        // 
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
-        parts.map(_.eTag.replaceAll("\"", "")) shouldBe 
inputsUntilAbort.map(byteStringToMD5)
-    }
   }
 
-  it should "upload 1 file slowly, cancel it and then resume it to complete 
the upload" in {
-    // This test doesn't work on Minio since minio doesn't properly implement 
this API, see
-    // https://github.com/minio/minio/issues/13246
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val sourceKey = "original/file-slow-2.txt"
-    val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2")
+  it should "upload 1 file slowly, cancel it and then resume it to complete 
the upload" in withDefaultBucket {
+    defaultBucket =>
+      // This test doesn't work on Minio since minio doesn't properly 
implement this API, see
+      // https://github.com/minio/minio/issues/13246
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val sourceKey = "original/file-slow-2.txt"
+      val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2")
 
-    val inputData = createStringCollectionWithMinChunkSize(6)
+      val inputData = createStringCollectionWithMinChunkSize(6)
 
-    val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+      val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
 
-    val multiPartUpload =
-      slowSource
-        .toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(
-          Keep.right)
-        .run()
+      val multiPartUpload =
+        slowSource
+          .toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(
+            Keep.right)
+          .run()
 
-    val results = for {
-      _ <- pekko.pattern.after(25.seconds)(Future {
-        sharedKillSwitch.abort(AbortException)
-      })
-      _ <- multiPartUpload.recover {
-        case AbortException => ()
+      val results = for {
+        _ <- pekko.pattern.after(25.seconds)(Future {
+          sharedKillSwitch.abort(AbortException)
+        })
+        _ <- multiPartUpload.recover {
+          case AbortException => ()
+        }
+        incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
+
+        uploadId = incomplete.collectFirst {
+          case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+        }.get
+
+        parts <- S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
+
+        remainingData = inputData.slice(3, 6)
+        _ <- Source(remainingData)
+          .toMat(
+            S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId, 
parts.map(_.toPart))
+              .withAttributes(attributes))(
+            Keep.right)
+          .run()
+
+        // This delay is here because sometimes there is a delay when you 
complete a large file and its
+        // actually downloadable
+        downloaded <- pekko.pattern.after(5.seconds)(
+          S3.getObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.seq))
+
+        _ <- S3.deleteObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.head)
+      } yield downloaded
+
+      whenReady(results) { downloads =>
+        val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
+        val fullInputData = inputData.fold(ByteString.empty)(_ ++ _)
+
+        fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
       }
-      incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
-
-      uploadId = incomplete.collectFirst {
-        case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
-      }.get
-
-      parts <- S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
-
-      remainingData = inputData.slice(3, 6)
-      _ <- Source(remainingData)
-        .toMat(
-          S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId, 
parts.map(_.toPart))
-            .withAttributes(attributes))(
-          Keep.right)
-        .run()
-
-      // This delay is here because sometimes there is a delay when you 
complete a large file and its
-      // actually downloadable
-      downloaded <- pekko.pattern.after(5.seconds)(
-        S3.getObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.seq))
-
-      _ <- S3.deleteObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.head)
-    } yield downloaded
-
-    whenReady(results) { downloads =>
-      val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
-      val fullInputData = inputData.fold(ByteString.empty)(_ ++ _)
-
-      fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
-    }
   }
 
-  it should "upload a full file but complete it manually with 
S3.completeMultipartUploadSource" in {
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    val sourceKey = "original/file-slow-3.txt"
-    val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-3")
+  it should "upload a full file but complete it manually with 
S3.completeMultipartUploadSource" in withDefaultBucket {
+    defaultBucket =>
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      val sourceKey = "original/file-slow-3.txt"
+      val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-3")
 
-    val inputData = createStringCollectionWithMinChunkSize(4)
+      val inputData = createStringCollectionWithMinChunkSize(4)
 
-    val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+      val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
 
-    val multiPartUpload =
-      slowSource
-        .toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(
-          Keep.right)
-        .run()
+      val multiPartUpload =
+        slowSource
+          .toMat(S3.multipartUpload(defaultBucket, 
sourceKey).withAttributes(attributes))(
+            Keep.right)
+          .run()
 
-    val results = for {
-      _ <- pekko.pattern.after(25.seconds)(Future {
-        sharedKillSwitch.abort(AbortException)
-      })
-      _ <- multiPartUpload.recover {
-        case AbortException => ()
+      val results = for {
+        _ <- pekko.pattern.after(25.seconds)(Future {
+          sharedKillSwitch.abort(AbortException)
+        })
+        _ <- multiPartUpload.recover {
+          case AbortException => ()
+        }
+        incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
+
+        uploadId = incomplete.collectFirst {
+          case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+        }.get
+
+        parts <- S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
+
+        _ <- S3.completeMultipartUpload(defaultBucket, sourceKey, uploadId, 
parts.map(_.toPart))(implicitly, attributes)
+        // This delay is here because sometimes there is a delay when you 
complete a large file and its
+        // actually downloadable
+        downloaded <- pekko.pattern.after(5.seconds)(
+          S3.getObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.seq))
+        _ <- S3.deleteObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.head)
+      } yield downloaded
+
+      whenReady(results) { downloads =>
+        val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
+        val fullInputData = inputData.slice(0, 3).fold(ByteString.empty)(_ ++ 
_)
+
+        fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
       }
-      incomplete <- S3.listMultipartUpload(defaultBucket, 
None).withAttributes(attributes).runWith(Sink.seq)
-
-      uploadId = incomplete.collectFirst {
-        case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
-      }.get
-
-      parts <- S3.listParts(defaultBucket, sourceKey, 
uploadId).withAttributes(attributes).runWith(Sink.seq)
-
-      _ <- S3.completeMultipartUpload(defaultBucket, sourceKey, uploadId, 
parts.map(_.toPart))(implicitly, attributes)
-      // This delay is here because sometimes there is a delay when you 
complete a large file and its
-      // actually downloadable
-      downloaded <- pekko.pattern.after(5.seconds)(
-        S3.getObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.seq))
-      _ <- S3.deleteObject(defaultBucket, 
sourceKey).withAttributes(attributes).runWith(Sink.head)
-    } yield downloaded
-
-    whenReady(results) { downloads =>
-      val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
-      val fullInputData = inputData.slice(0, 3).fold(ByteString.empty)(_ ++ _)
-
-      fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
-    }
   }
   @tailrec
   final def createStringCollectionContextWithMinChunkSizeRec(
@@ -761,7 +932,7 @@ trait S3IntegrationSpec
   def createStringCollectionContextWithMinChunkSize(numberOfChunks: Int): 
List[List[(ByteString, BigInt)]] =
     
createStringCollectionContextWithMinChunkSizeRec(numberOfChunks).toList.map(_.toList)
 
-  it should "perform a chunked multi-part upload with the correct context" in {
+  it should "perform a chunked multi-part upload with the correct context" in 
withDefaultBucket { defaultBucket =>
     val sourceKey = "original/file-context-1.txt"
     val inputData = createStringCollectionContextWithMinChunkSize(4)
     val source = Source(inputData.flatten)
@@ -798,130 +969,141 @@ trait S3IntegrationSpec
   }
 
   it should "make a bucket with given name" in {
-    implicit val attr: Attributes = attributes
-    val bucketName = "samplebucket1"
+    forAll(genBucketName("samplebucket1")) { bucketName =>
+      system.log.info(s"Created bucket with name: $bucketName")
 
-    val request: Future[Done] = S3
-      .makeBucket(bucketName)
+      implicit val attr: Attributes = attributes
+      val request: Future[Done] = S3
+        .makeBucket(bucketName)
 
-    whenReady(request) { value =>
-      value shouldEqual Done
+      whenReady(request) { value =>
+        value shouldEqual Done
 
-      S3.deleteBucket(bucketName).futureValue shouldBe Done
+        S3.deleteBucket(bucketName).futureValue shouldBe Done
+      }
     }
   }
 
-  it should "throw an exception while creating a bucket with the same name in 
Minio" in {
-    // S3 will only throw a BucketAlreadyExists exception if the owner of the 
account creating the bucket with an
-    // already existing name different from the owner of the already existing 
bucket. On the other hand Minio will
-    // always throw this exception. Since its hard to test the S3 case of 
different owners we only run this test
-    // against Minio.
-    assume(this.isInstanceOf[MinioS3IntegrationSpec])
-    implicit val attr: Attributes = attributes
-    S3.makeBucket(defaultBucket).failed.futureValue shouldBe an[S3Exception]
+  it should "throw an exception while creating a bucket with the same name in 
Minio" in withDefaultBucket {
+    defaultBucket =>
+      // S3 will only throw a BucketAlreadyExists exception if the owner of 
the account creating the bucket with an
+      // already existing name different from the owner of the already 
existing bucket. On the other hand Minio will
+      // always throw this exception. Since its hard to test the S3 case of 
different owners we only run this test
+      // against Minio.
+      assume(this.isInstanceOf[MinioS3IntegrationSpec])
+      implicit val attr: Attributes = attributes
+      S3.makeBucket(defaultBucket).failed.futureValue shouldBe an[S3Exception]
   }
 
-  it should "Do nothing while creating a bucket with the same name and owner 
in AWS S3" in {
-    // Due to this being a PUT request, S3 doesn't actually throw an exception 
unless
-    // you are NOT the owner of the already existing bucket.
-    // See https://github.com/aws/aws-sdk-go/issues/1362#issuecomment-722554726
-    assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    implicit val attr: Attributes = attributes
-    S3.makeBucket(defaultBucket).futureValue shouldBe Done
+  it should "Do nothing while creating a bucket with the same name and owner 
in AWS S3" in withDefaultBucket {
+    defaultBucket =>
+      // Due to this being a PUT request, S3 doesn't actually throw an 
exception unless
+      // you are NOT the owner of the already existing bucket.
+      // See 
https://github.com/aws/aws-sdk-go/issues/1362#issuecomment-722554726
+      assume(this.isInstanceOf[AWSS3IntegrationSpec])
+      implicit val attr: Attributes = attributes
+      S3.makeBucket(defaultBucket).futureValue shouldBe Done
   }
 
   it should "enable and disable versioning for a bucket" in {
     // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
     assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    implicit val attr: Attributes = attributes
-    val bucketName = "samplebucketversioning"
-
-    val request = for {
-      _ <- S3.makeBucket(bucketName)
-      firstResult <- S3.getBucketVersioning(bucketName)
-      _ <- S3.putBucketVersioning(bucketName, 
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
-      secondResult <- S3.getBucketVersioning(bucketName)
-    } yield (firstResult, secondResult)
-
-    whenReady(request) { case (firstValue, secondValue) =>
-      firstValue shouldEqual BucketVersioningResult()
-      firstValue.bucketVersioningEnabled shouldEqual false
-      secondValue shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
-      secondValue.bucketVersioningEnabled shouldEqual true
-      S3.putBucketVersioning(bucketName,
-        
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
-      S3.deleteBucket(bucketName).futureValue
+    forAll(genBucketName("samplebucket1")) { bucketName =>
+      implicit val attr: Attributes = attributes
+
+      val request = for {
+        _ <- S3.makeBucket(bucketName)
+        firstResult <- S3.getBucketVersioning(bucketName)
+        _ <- S3.putBucketVersioning(bucketName, 
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
+        secondResult <- S3.getBucketVersioning(bucketName)
+      } yield (firstResult, secondResult)
+
+      whenReady(request) { case (firstValue, secondValue) =>
+        firstValue shouldEqual BucketVersioningResult()
+        firstValue.bucketVersioningEnabled shouldEqual false
+        secondValue shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
+        secondValue.bucketVersioningEnabled shouldEqual true
+        S3.putBucketVersioning(bucketName,
+          
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
+        S3.deleteBucket(bucketName).futureValue
+      }
     }
   }
 
   it should "enable and disable versioning for a bucket with MFA delete 
configured to false" in {
     // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
     assume(this.isInstanceOf[AWSS3IntegrationSpec])
-    implicit val attr: Attributes = attributes
-    val bucketName = "samplebucketversioningmfadeletefalse"
-
-    val request = for {
-      _ <- S3.makeBucket(bucketName)
-      _ <- S3.putBucketVersioning(bucketName,
-        BucketVersioning()
-          .withStatus(BucketVersioningStatus.Enabled)
-          .withMfaDelete(MFAStatus.Disabled))
-      result <- S3.getBucketVersioning(bucketName)
-    } yield result
-
-    whenReady(request) { value =>
-      value shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
-      S3.putBucketVersioning(bucketName,
-        
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
-      S3.deleteBucket(bucketName).futureValue
+    forAll(genBucketName("samplebucketversioningmfadeletefalse")) { bucketName 
=>
+      implicit val attr: Attributes = attributes
+
+      val request = for {
+        _ <- S3.makeBucket(bucketName)
+        _ <- S3.putBucketVersioning(bucketName,
+          BucketVersioning()
+            .withStatus(BucketVersioningStatus.Enabled)
+            .withMfaDelete(MFAStatus.Disabled))
+        result <- S3.getBucketVersioning(bucketName)
+      } yield result
+
+      whenReady(request) { value =>
+        value shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
+        S3.putBucketVersioning(bucketName,
+          
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
+        S3.deleteBucket(bucketName).futureValue
+      }
     }
   }
 
   it should "create and delete bucket with a given name" in {
-    val bucketName = "samplebucket3"
+    forAll(genBucketName("samplebucket3")) { bucketName =>
+      system.log.info(s"Created bucket with name: $bucketName")
 
-    val makeRequest: Source[Done, NotUsed] = S3
-      .makeBucketSource(bucketName)
-      .withAttributes(attributes)
-    val deleteRequest: Source[Done, NotUsed] = S3
-      .deleteBucketSource(bucketName)
-      .withAttributes(attributes)
+      val makeRequest: Source[Done, NotUsed] = S3
+        .makeBucketSource(bucketName)
+        .withAttributes(attributes)
+      val deleteRequest: Source[Done, NotUsed] = S3
+        .deleteBucketSource(bucketName)
+        .withAttributes(attributes)
 
-    val request = for {
-      make <- makeRequest.runWith(Sink.ignore)
-      delete <- deleteRequest.runWith(Sink.ignore)
-    } yield (make, delete)
+      val request = for {
+        make <- makeRequest.runWith(Sink.ignore)
+        delete <- deleteRequest.runWith(Sink.ignore)
+      } yield (make, delete)
 
-    request.futureValue should equal((Done, Done))
+      request.futureValue should equal((Done, Done))
+    }
   }
 
   it should "create a bucket in the non default us-east-1 region" in {
-    val bucketName = "samplebucketotherregion"
+    forAll(genBucketName("samplebucketotherregion")) { bucketName =>
+      system.log.info(s"Created bucket with name: $bucketName")
 
-    val request = for {
-      _ <- S3
-        .makeBucketSource(bucketName)
-        .withAttributes(otherRegionSettingsPathStyleAccess)
-        .runWith(Sink.head)
-      result <- S3
-        .checkIfBucketExistsSource(bucketName)
-        .withAttributes(otherRegionSettingsPathStyleAccess)
-        .runWith(Sink.head)
-      _ <- S3
-        .deleteBucketSource(bucketName)
-        .withAttributes(otherRegionSettingsPathStyleAccess)
-        .runWith(Sink.head)
-    } yield result
+      val request = for {
+        _ <- S3
+          .makeBucketSource(bucketName)
+          .withAttributes(otherRegionSettingsPathStyleAccess)
+          .runWith(Sink.head)
+        result <- S3
+          .checkIfBucketExistsSource(bucketName)
+          .withAttributes(otherRegionSettingsPathStyleAccess)
+          .runWith(Sink.head)
+        _ <- S3
+          .deleteBucketSource(bucketName)
+          .withAttributes(otherRegionSettingsPathStyleAccess)
+          .runWith(Sink.head)
+      } yield result
 
-    request.futureValue shouldEqual AccessGranted
+      request.futureValue shouldEqual AccessGranted
+    }
   }
 
-  it should "throw an exception while deleting bucket that doesn't exist" in {
-    implicit val attr: Attributes = attributes
-    S3.deleteBucket(nonExistingBucket).failed.futureValue shouldBe 
an[S3Exception]
+  it should "throw an exception while deleting bucket that doesn't exist" in 
withNonExistingBucket {
+    nonExistingBucket =>
+      implicit val attr: Attributes = attributes
+      S3.deleteBucket(nonExistingBucket).failed.futureValue shouldBe 
an[S3Exception]
   }
 
-  it should "check if bucket exists" in {
+  it should "check if bucket exists" in withDefaultBucket { defaultBucket =>
     implicit val attr: Attributes = attributes
     val checkIfBucketExits: Future[BucketAccess] = 
S3.checkIfBucketExists(defaultBucket)
 
@@ -930,7 +1112,7 @@ trait S3IntegrationSpec
     }
   }
 
-  it should "check for non-existing bucket" in {
+  it should "check for non-existing bucket" in withNonExistingBucket { 
nonExistingBucket =>
     implicit val attr: Attributes = attributes
     val request: Future[BucketAccess] = 
S3.checkIfBucketExists(nonExistingBucket)
 
@@ -939,7 +1121,7 @@ trait S3IntegrationSpec
     }
   }
 
-  it should "contain error code even if exception in empty" in {
+  it should "contain error code even if exception in empty" in 
withDefaultBucket { defaultBucket =>
     val exception =
       S3.getObjectMetadata(defaultBucket, "sample")
         .withAttributes(invalidCredentials)
@@ -953,24 +1135,26 @@ trait S3IntegrationSpec
 
   private val chunk: ByteString = 
ByteString.fromArray(Array.fill(S3.MinChunkSize)(0.toByte))
 
-  it should "only upload single chunk when size of the ByteString equals chunk 
size" in {
-    val source: Source[ByteString, Any] = Source.single(chunk)
-    uploadAndAndCheckParts(source, 1)
+  it should "only upload single chunk when size of the ByteString equals chunk 
size" in withDefaultBucket {
+    defaultBucket =>
+      val source: Source[ByteString, Any] = Source.single(chunk)
+      uploadAndAndCheckParts(defaultBucket, source, 1)
   }
 
-  it should "only upload single chunk when exact chunk is followed by an empty 
ByteString" in {
-    val source: Source[ByteString, Any] = Source[ByteString](
-      chunk :: ByteString.empty :: Nil)
+  it should "only upload single chunk when exact chunk is followed by an empty 
ByteString" in withDefaultBucket {
+    defaultBucket =>
+      val source: Source[ByteString, Any] = Source[ByteString](
+        chunk :: ByteString.empty :: Nil)
 
-    uploadAndAndCheckParts(source, 1)
+      uploadAndAndCheckParts(defaultBucket, source, 1)
   }
 
-  it should "upload two chunks size of ByteStrings equals chunk size" in {
+  it should "upload two chunks size of ByteStrings equals chunk size" in 
withDefaultBucket { defaultBucket =>
     val source: Source[ByteString, Any] = Source(chunk :: chunk :: Nil)
-    uploadAndAndCheckParts(source, 2)
+    uploadAndAndCheckParts(defaultBucket, source, 2)
   }
 
-  it should "upload empty source" in {
+  it should "upload empty source" in withDefaultBucket { defaultBucket =>
     val upload =
       for {
         upload <- Source
@@ -984,7 +1168,8 @@ trait S3IntegrationSpec
     upload.futureValue.eTag should not be empty
   }
 
-  private def uploadAndAndCheckParts(source: Source[ByteString, _], 
expectedParts: Int): Assertion = {
+  private def uploadAndAndCheckParts(
+      defaultBucket: String, source: Source[ByteString, _], expectedParts: 
Int): Assertion = {
     val metadata =
       for {
         _ <- source.runWith(
@@ -1001,7 +1186,7 @@ trait S3IntegrationSpec
     etag.substring(etag.indexOf('-') + 1).toInt shouldBe expectedParts
   }
 
-  private def uploadDownloadAndDeleteInOtherRegionCase(objectKey: String): 
Assertion = {
+  private def uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots: String, 
objectKey: String): Assertion = {
     val source: Source[ByteString, Any] = Source(ByteString(objectValue) :: 
Nil)
 
     val results = for {
@@ -1028,7 +1213,7 @@ trait S3IntegrationSpec
       .futureValue shouldEqual pekko.Done
   }
 
-  private def uploadCopyDownload(sourceKey: String, targetKey: String): 
Assertion = {
+  private def uploadCopyDownload(defaultBucket: String, sourceKey: String, 
targetKey: String): Assertion = {
     val source: Source[ByteString, Any] = 
Source.single(ByteString(objectValue))
 
     val results = for {
@@ -1065,21 +1250,39 @@ trait S3IntegrationSpec
 }
 
 /*
- * This is an integration test and ignored by default
- *
- * For running the tests you need to create 2 buckets:
- *  - one in region us-east-1
- *  - one in an other region (eg eu-central-1)
- * Update the bucket name and regions in the code below
+ * This is an integration test. In order for the test suite to run make sure 
you have the necessary
+ * permissions to create/delete buckets and objects in different regions.
  *
  * Set your keys aws access-key-id and secret-access-key in 
src/test/resources/application.conf
  *
- * Comment @ignore and run the tests
- * (tests that do listing counts might need some tweaking)
+ * Since this a test that is marked with `@DoNotDiscover` it will not run as 
part of the test suite unless its
+ * explicitly executed with
+ * `sbt "s3/Test/runMain org.scalatest.tools.Runner -o -s 
org.apache.pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec"`
  *
+ * See https://www.scalatest.org/user_guide/using_the_runner for more details 
about the runner.
+ *
+ * Running tests directly via IDE's such as Intellij is also supported
  */
-@Ignore
-class AWSS3IntegrationSpec extends 
TestKit(ActorSystem("AWSS3IntegrationSpec")) with S3IntegrationSpec
+@DoNotDiscover
+class AWSS3IntegrationSpec extends 
TestKit(ActorSystem("AWSS3IntegrationSpec")) with S3IntegrationSpec {
+  // Its recommended when testing against S3 to use a specific prefix in order 
to identify where the buckets
+  // are coming from since typically S3 accounts can also in other contexts
+  override lazy val bucketPrefix: Option[String] =
+    
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.bucketPrefix").orElse(
+      Some("pekko-connectors-"))
+
+  override lazy val enableCleanup: Option[FiniteDuration] =
+    
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.enableCleanup").map(
+      Duration.apply).collect {
+      case finiteDuration: FiniteDuration => finiteDuration
+    }.orElse(Some(1.minute))
+
+  // Since S3 accounts share global state, we should randomly generate bucket 
names so concurrent tests
+  // against an S3 account don't conflict with eachother
+  override lazy val randomlyGenerateBucketNames: Boolean =
+    
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.randomlyGenerateBucketNames")
+      .map(_.toBoolean).getOrElse(true)
+}
 
 /*
  * For this test, you need a have docker installed and running.
@@ -1093,6 +1296,10 @@ class MinioS3IntegrationSpec
     with S3IntegrationSpec
     with MinioS3Test {
 
+  // Since a unique new Minio container is started with each test run there is 
no point in making random
+  // bucket names
+  override lazy val randomlyGenerateBucketNames: Boolean = false
+
   override lazy val defaultS3Settings: S3Settings = s3Settings
     .withS3RegionProvider(
       new AwsRegionProvider {
@@ -1100,27 +1307,6 @@ class MinioS3IntegrationSpec
       })
     .withAccessStyle(PathAccessStyle)
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    val makeBuckets = for {
-      _ <- S3.makeBucket(bucketWithDots)(
-        system,
-        attributes(
-          _.withS3RegionProvider(
-            new AwsRegionProvider {
-              val getRegion: Region = Region.EU_CENTRAL_1
-            })))
-      _ <- S3.makeBucket(defaultBucket)(
-        system,
-        attributes)
-      _ <- S3.makeBucket(bucketWithVersioning)(
-        system,
-        attributes)
-    } yield ()
-    Await.result(makeBuckets, 10.seconds)
-  }
-
   it should "properly set the endpointUrl using VirtualHostAccessStyle" in {
     s3Settings
       .withAccessStyle(VirtualHostAccessStyle)
@@ -1129,3 +1315,10 @@ class MinioS3IntegrationSpec
       .value shouldEqual container.getVirtualHost
   }
 }
+
+object S3IntegrationSpec {
+  val BucketWithDots = "my.test.frankfurt"
+  val DefaultBucket = "my-test-us-east-1"
+  val BucketWithVersioning = "my-bucket-with-versioning"
+  val NonExistingBucket = "nowhere"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to