This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 5880466e5 Integrate AWSS3IntegrationSpec with S3 Account
5880466e5 is described below
commit 5880466e51d1df2e8d8a82986324afd866de278f
Author: Matthew de Detrich <[email protected]>
AuthorDate: Wed Apr 19 00:34:15 2023 +0200
Integrate AWSS3IntegrationSpec with S3 Account
---
.github/workflows/nightly-builds.yaml | 36 +
project/Dependencies.scala | 21 +-
.../pekko/stream/connectors/s3/TestUtils.scala | 71 ++
.../stream/connectors/s3/scaladsl/Generators.scala | 114 +++
.../connectors/s3/scaladsl/S3IntegrationSpec.scala | 1005 ++++++++++++--------
5 files changed, 838 insertions(+), 409 deletions(-)
diff --git a/.github/workflows/nightly-builds.yaml
b/.github/workflows/nightly-builds.yaml
new file mode 100644
index 000000000..d687e3f47
--- /dev/null
+++ b/.github/workflows/nightly-builds.yaml
@@ -0,0 +1,36 @@
+name: Nightly Builds
+
+on:
+ schedule:
+ - cron: "0 0 * * *"
+ workflow_dispatch:
+
+permissions: {}
+
+jobs:
+ integration-tests:
+ name: Pekko Connectors Integration tests
+ runs-on: ubuntu-20.04
+ if: github.repository == 'apache/incubator-pekko-connectors'
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+
+ - name: Setup Java 8
+ uses: actions/setup-java@v3
+ with:
+ distribution: temurin
+ java-version: 8
+
+ - name: Cache Coursier cache
+ uses: coursier/[email protected]
+
+ - name: S3 Integration tests
+ run: |-
+ sbt \
+ -Dpekko.connectors.s3.aws.credentials.provider=static \
+ -Dpekko.connectors.s3.aws.credentials.access-key-id=${{
secrets.AWS_ACCESS_KEY }} \
+ -Dpekko.connectors.s3.aws.credentials.secret-access-key=${{
secrets.AWS_SECRET_KEY }} \
+ + "s3/Test/runMain org.scalatest.tools.Runner -o -s
org.apache.pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec"
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 483d587f6..f262215c0 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -33,6 +33,19 @@ object Dependencies {
val TestContainersScalaTestVersion = "0.40.3"
val mockitoVersion = "4.2.0" // check even
https://github.com/scalatest/scalatestplus-mockito/releases
val hoverflyVersion = "0.14.1"
+ val scalaCheckVersion = "1.15.4"
+
+ /**
+ * Calculates the scalatest version in a format that is used for
`org.scalatestplus` scalacheck artifacts
+ *
+ * @see
+ * https://www.scalatest.org/user_guide/property_based_testing
+ */
+ private def scalaTestPlusScalaCheckVersion(version: String) =
+ version.split('.').take(2).mkString("-")
+
+ val scalaTestScalaCheckArtifact =
s"scalacheck-${scalaTestPlusScalaCheckVersion(scalaCheckVersion)}"
+ val scalaTestScalaCheckVersion = s"$ScalaTestVersion.0"
val CouchbaseVersion = "2.7.16"
val CouchbaseVersionForDocs = "2.7"
@@ -155,7 +168,7 @@ object Dependencies {
("org.apache.hadoop" % "hadoop-client" % "3.2.1" %
Test).exclude("log4j", "log4j"), // Apache2
("org.apache.hadoop" % "hadoop-common" % "3.2.1" %
Test).exclude("log4j", "log4j"), // Apache2
"com.sksamuel.avro4s" %% "avro4s-core" % "3.0.9" % Test,
- "org.scalacheck" %% "scalacheck" % "1.15.4" % Test,
+ "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
"org.specs2" %% "specs2-core" % "4.8.3" % Test, // MIT like:
https://github.com/etorreborre/specs2/blob/master/LICENSE.txt
"org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test // MIT
like: http://www.slf4j.org/license.html
))
@@ -375,8 +388,10 @@ object Dependencies {
"software.amazon.awssdk" % "auth" % AwsSdk2Version,
// in-memory filesystem for file related tests
"com.google.jimfs" % "jimfs" % "1.2" % Test, // ApacheV2
- "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test // ApacheV2
- ))
+ "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test, // ApacheV2
+ "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
+ "org.scalatestplus" %% scalaTestScalaCheckArtifact %
scalaTestScalaCheckVersion % Test,
+ "com.markatta" %% "futiles" % "2.0.2" % Test))
val SpringWeb = {
val SpringVersion = "5.1.17.RELEASE"
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
new file mode 100644
index 000000000..cec159ac3
--- /dev/null
+++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+package org.apache.pekko.stream.connectors.s3
+
+import markatta.futiles.Retry
+import org.apache.commons.lang3.StringUtils
+import org.apache.pekko
+import org.apache.pekko.stream.connectors.s3.scaladsl.S3
+import org.apache.pekko.stream.scaladsl.Sink
+import org.scalacheck.Gen
+import pekko.actor.ActorSystem
+import pekko.stream.Attributes
+
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+object TestUtils {
+ def checkEnvVarAvailable(env: String): Boolean =
+ try
+ sys.env.get(env) match {
+ case Some(value) if StringUtils.isBlank(value) => false
+ case Some(_) => true
+ case None => false
+ }
+ catch {
+ case _: NoSuchElementException => false
+ }
+
+ def cleanAndDeleteBucket(bucket: String, s3Attrs: Attributes)(implicit
system: ActorSystem): Future[Unit] = {
+ implicit val ec: ExecutionContext = system.dispatcher
+ for {
+ bucketVersioningResult <- S3.getBucketVersioning(bucket)(implicitly,
s3Attrs)
+ deleteAllVersions =
bucketVersioningResult.status.contains(BucketVersioningStatus.Enabled)
+ _ <- S3.deleteBucketContents(bucket, deleteAllVersions =
deleteAllVersions).withAttributes(s3Attrs).runWith(
+ Sink.ignore)
+ multiParts <-
+ S3.listMultipartUpload(bucket,
None).withAttributes(s3Attrs).runWith(Sink.seq)
+ _ <- Future.sequence(multiParts.map { part =>
+ S3.deleteUpload(bucket, part.key, part.uploadId)(implicitly, s3Attrs)
+ })
+ _ <- Retry.retryWithBackOff(
+ 5,
+ 100.millis,
+ throwable => throwable.getMessage.contains("The bucket you tried to
delete is not empty"))(
+ S3.deleteBucket(bucket)(implicitly, s3Attrs))
+ _ = system.log.info(s"Completed deleting bucket $bucket")
+ } yield ()
+ }
+
+ /**
+ * Will return a single value from a given generator. WARNING this will
block the thread
+ * until a value is retrieved so only use this for generators that have a
high chance of
+ * returning a value. If a [[Gen.filter]] ends up filtering out too many
values this can
+ * cause the thread to be stuck for a long time
+ */
+ @tailrec
+ def loopUntilGenRetrievesValue[T](gen: Gen[T]): T =
+ gen.sample match {
+ case Some(value) => value
+ case None => loopUntilGenRetrievesValue(gen)
+ }
+
+}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
new file mode 100644
index 000000000..93cd5f390
--- /dev/null
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/Generators.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+package org.apache.pekko.stream.connectors.s3.scaladsl
+
+import org.scalacheck.Gen
+
+import scala.annotation.nowarn
+import scala.language.postfixOps
+object Generators {
+ val MaxBucketLength: Int = 63
+
+ // See
https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
for valid
+ // bucketnames
+
+ lazy val bucketLetterOrNumberCharGen: Gen[Char] = Gen.frequency(
+ (1, Gen.numChar),
+ (1, Gen.alphaLowerChar))
+
+ def bucketAllCharGen(useVirtualDotHost: Boolean): Gen[Char] = {
+ val base = List(
+ (10, Gen.alphaLowerChar),
+ (1, Gen.const('-')),
+ (1, Gen.numChar))
+
+ val frequency = if (useVirtualDotHost) (1, Gen.const('.')) +: base else
base
+
+ Gen.frequency(frequency: _*)
+ }
+
+ @nowarn("msg=not.*?exhaustive")
+ private def checkInvalidDuplicateChars(chars: List[Char]): Boolean =
+ chars.sliding(2).forall { case Seq(before, after) =>
+ !(before == '.' && after == '.' || before == '-' && after == '.' ||
before == '.' && after == '-')
+ }
+
+ private def checkAlphaChar(c: Char): Boolean =
+ c >= 'a' && c <= 'z'
+
+ private def allCharCheck(useVirtualDotHost: Boolean, string: String):
Boolean =
+ if (useVirtualDotHost) {
+ string.forall(char => Character.isDigit(char) || checkAlphaChar(char) ||
char == '-' || char == '.') &&
+ checkInvalidDuplicateChars(string.toList)
+ } else
+ string.forall(char => Character.isDigit(char) || checkAlphaChar(char) ||
char == '-')
+
+ def validatePrefix(useVirtualDotHost: Boolean, prefix: Option[String]):
Option[String] = {
+ val withoutWhitespace = prefix match {
+ case Some(value) if value.trim == "" => None
+ case Some(value) => Some(value)
+ case None => None
+ }
+
+ withoutWhitespace match {
+ case Some(value) if !(Character.isDigit(value.head) ||
checkAlphaChar(value.head)) =>
+ throw new IllegalArgumentException(
+ s"Invalid starting digit for prefix $value, ${value.head} needs to
be an alpha char or digit")
+ case Some(value) if value.length > 1 =>
+ if (!allCharCheck(useVirtualDotHost, value.drop(1)))
+ throw new IllegalArgumentException(
+ s"Prefix $value contains invalid characters")
+ case Some(value) if value.length > MaxBucketLength - 1 =>
+ throw new IllegalArgumentException(
+ s"Prefix is too long, it has size ${value.length} where as the max
bucket size is $MaxBucketLength")
+ case _ => ()
+ }
+
+ withoutWhitespace
+ }
+
+ def bucketNameGen(useVirtualDotHost: Boolean, prefix: Option[String] =
None): Gen[String] = {
+ val finalPrefix = validatePrefix(useVirtualDotHost, prefix)
+
+ for {
+ range <- {
+ val maxLength = finalPrefix match {
+ case Some(p) => MaxBucketLength - p.length
+ case None => MaxBucketLength
+ }
+
+ if (maxLength > 3)
+ Gen.choose(3, maxLength)
+ else
+ Gen.const(maxLength)
+ }
+ startString = finalPrefix.getOrElse("")
+
+ bucketName <- range match {
+ case 3 =>
+ for {
+ first <- bucketLetterOrNumberCharGen
+ second <- bucketAllCharGen(useVirtualDotHost)
+ third <- bucketLetterOrNumberCharGen
+ } yield startString ++ List(first, second, third).mkString
+ case _ =>
+ for {
+ first <- bucketLetterOrNumberCharGen
+ last <- bucketLetterOrNumberCharGen
+ middle <- {
+ val gen = Gen.listOfN(range - 2,
bucketAllCharGen(useVirtualDotHost))
+ if (useVirtualDotHost) gen.filter(checkInvalidDuplicateChars)
else gen
+ }
+ } yield startString ++ first.toString ++ middle.mkString ++
last.toString
+ }
+ } yield bucketName
+ }
+
+}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 08025149c..30932cf3b 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.stream.connectors.s3.scaladsl
import org.apache.pekko
+import org.scalacheck.Gen
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.{ ContentTypes, StatusCodes }
@@ -27,16 +28,17 @@ import pekko.testkit.{ TestKit, TestKitBase }
import pekko.util.ccompat.JavaConverters._
import pekko.util.ByteString
import pekko.{ Done, NotUsed }
-import org.scalatest.Inspectors.forEvery
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
+import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
import software.amazon.awssdk.auth.credentials._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers._
import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
@@ -48,6 +50,7 @@ trait S3IntegrationSpec
with BeforeAndAfterAll
with Matchers
with ScalaFutures
+ with ScalaCheckPropertyChecks
with OptionValues
with LogCapturing {
@@ -55,23 +58,172 @@ trait S3IntegrationSpec
implicit val defaultPatience: PatienceConfig = PatienceConfig(3.minutes,
100.millis)
- val defaultBucket = "my-test-us-east-1"
- val nonExistingBucket = "nowhere"
+ /**
+ * A prefix that will get added to each generated bucket in the test, this
is to track the buckets that are
+ * specifically created by the test
+ */
+ lazy val bucketPrefix: Option[String] = None
+
+ /**
+ * Whether to randomly generate bucket names
+ */
+ val randomlyGenerateBucketNames: Boolean
+
+ implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
+ PropertyCheckConfiguration(minSuccessful = 1)
+
+ /**
+ * Whether to enable cleanup of buckets after tests are run and if so the
initial delay to wait after the test
+ */
+ lazy val enableCleanup: Option[FiniteDuration] = None
+
+ /**
+ * The MaxTimeout when cleaning up all of the buckets during `afterAll`
+ */
+ lazy val maxCleanupTimeout: FiniteDuration = 10.minutes
+
+ /**
+ * @param constantBucketName The bucket name constant to use if we are not
randomly generating bucket names
+ */
+ def genBucketName(constantBucketName: String): Gen[String] =
+ if (randomlyGenerateBucketNames)
+ Generators.bucketNameGen(useVirtualDotHost = false, bucketPrefix)
+ else
+ Gen.const(bucketPrefix.getOrElse("") ++ constantBucketName)
+
+ def createBucket(bucket: String, versioning: Boolean, bucketReference:
AtomicReference[String], s3Attrs: Attributes)
+ : Future[Unit] =
+ for {
+ bucketResponse <- S3.checkIfBucketExists(bucket)(implicitly, s3Attrs)
+ _ <- bucketResponse match {
+ case BucketAccess.AccessDenied =>
+ throw new RuntimeException(
+ s"Unable to create bucket: $bucket since it already exists however
permissions are inadequate")
+ case BucketAccess.AccessGranted | BucketAccess.NotExists =>
+ for {
+ _ <- bucketResponse match {
+ case BucketAccess.AccessGranted =>
+ system.log.info(
+ s"Deleting and recreating bucket: $bucket since it already
exists with correct permissions")
+ TestUtils.cleanAndDeleteBucket(bucket, s3Attrs)
+ case _ =>
+ Future.successful(())
+ }
+ _ <- S3.makeBucket(bucket)(implicitly, s3Attrs)
+ // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
+ _ <- if (versioning && this.isInstanceOf[AWSS3IntegrationSpec])
+ S3.putBucketVersioning(bucket,
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))(implicitly,
+ s3Attrs)
+ else
+ Future.successful(())
+ _ = bucketReference.set(bucket)
+ } yield ()
+ }
+ } yield ()
+
+ private val defaultBucketReference = new AtomicReference[String]()
+ def withDefaultBucket(testCode: String => Assertion): Assertion =
+ testCode(defaultBucketReference.get())
// with dots forcing path style access
- val bucketWithDots = "my.test.frankfurt"
+ private val bucketWithDotsReference = new AtomicReference[String]()
+ def withBucketWithDots(testCode: String => Assertion): Assertion =
+ testCode(bucketWithDotsReference.get())
+
+ private val nonExistingBucketReference = new AtomicReference[String]()
+ def withNonExistingBucket(testCode: String => Assertion): Assertion =
+ testCode(nonExistingBucketReference.get())
+
+ private val bucketWithVersioningReference = new AtomicReference[String]()
+ def withBucketWithVersioning(testCode: String => Assertion): Assertion =
+ testCode(bucketWithVersioningReference.get())
- val bucketWithVersioning = "my-bucket-with-versioning"
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val (bucketWithDots, defaultBucket, bucketWithVersioning,
nonExistingBucket) = {
+ val baseGen = for {
+ bucketWithDots <- genBucketName(S3IntegrationSpec.BucketWithDots)
+ defaultBucket <- genBucketName(S3IntegrationSpec.DefaultBucket)
+ bucketWithVersioning <-
genBucketName(S3IntegrationSpec.BucketWithVersioning)
+ nonExistingBucket <- genBucketName(S3IntegrationSpec.NonExistingBucket)
+ } yield (bucketWithDots, defaultBucket, bucketWithVersioning,
nonExistingBucket)
+
+ if (randomlyGenerateBucketNames)
+ TestUtils.loopUntilGenRetrievesValue(baseGen.filter {
+ case (bucketWithDots, defaultBucket, bucketWithVersioning,
nonExistingBucket) =>
+ // Make sure that we don't somehow generate 2 or more buckets with
the same name
+ List(bucketWithDots, defaultBucket, bucketWithVersioning,
nonExistingBucket).distinct.size == 4
+ })
+ else
+ baseGen.sample.get
+ }
+
+ val makeBuckets = for {
+ _ <- {
+ createBucket(bucketWithDots, versioning = false,
bucketWithDotsReference,
+ attributes(
+ _.withS3RegionProvider(
+ new AwsRegionProvider {
+ val getRegion: Region = Region.EU_CENTRAL_1
+ })))
+ }
+ _ <- createBucket(defaultBucket, versioning = false,
defaultBucketReference, attributes)
+ _ <- createBucket(bucketWithVersioning, versioning = true,
bucketWithVersioningReference, attributes)
+ _ = nonExistingBucketReference.set(nonExistingBucket)
+ } yield ()
+
+ system.log.info(
+ s"Corresponding S3 bucket names are bucketWithDots: $bucketWithDots,
defaultBucket: $defaultBucket, bucketWithVersioning: $bucketWithVersioning,
nonExistingBucket: $nonExistingBucket")
+
+ Await.result(makeBuckets, 10.seconds)
+ }
val objectKey = "test"
val objectValue = "Some String"
val metaHeaders: Map[String, String] = Map("location" -> "Africa",
"datatype" -> "image")
- override protected def afterAll(): Unit =
+ private def cleanBucket(bucket: String, attributes: Attributes):
Future[Unit] = (for {
+ check <- S3.checkIfBucketExists(bucket)(implicitly, attributes)
+ _ <- check match {
+ case BucketAccess.AccessDenied =>
+ Future {
+ system.log.warning(
+ s"Cannot delete bucket: $bucket due to having access denied.
Please look into this as it can fill up your AWS account")
+ }
+ case BucketAccess.AccessGranted =>
+ system.log.info(s"Cleaning up bucket: $bucket")
+ TestUtils.cleanAndDeleteBucket(bucket, attributes)
+ case BucketAccess.NotExists =>
+ Future {
+ system.log.info(s"Not deleting bucket: $bucket since it no longer
exists")
+ }
+ }
+ } yield ()).recover { case util.control.NonFatal(error) =>
+ system.log.error(s"Error deleting bucket: $bucket", error)
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ enableCleanup.foreach { timeout =>
+ val bucketsWithAttributes = List(
+ Option(defaultBucketReference.get()).map(bucket => (bucket,
attributes)),
+ Option(bucketWithDotsReference.get()).map(bucket =>
+ (bucket,
+ attributes(
+ _.withS3RegionProvider(
+ new AwsRegionProvider {
+ val getRegion: Region = Region.EU_CENTRAL_1
+ })))),
+ Option(bucketWithVersioningReference.get()).map(bucket => (bucket,
attributes))).flatten
+
+ Await.result(Future.traverse(bucketsWithAttributes) { case (bucket,
attrs) => cleanBucket(bucket, attrs) },
+ timeout)
+ }
Http(system)
.shutdownAllConnectionPools()
.foreach(_ => TestKit.shutdownActorSystem(system))
+ }
lazy val defaultS3Settings: S3Settings =
S3Settings()
@@ -103,7 +255,7 @@ trait S3IntegrationSpec
def defaultRegionContentCount = 0
def otherRegionContentCount = 0
- it should "list buckets in current aws account" in {
+ it should "list buckets in current aws account" in withDefaultBucket {
defaultBucket =>
val result = for {
buckets <- S3.listBuckets().withAttributes(attributes).runWith(Sink.seq)
} yield buckets
@@ -113,9 +265,8 @@ trait S3IntegrationSpec
buckets.map(_.name) should contain(defaultBucket)
}
- it should "list buckets in current AWS account using non US_EAST_1 region"
in {
+ it should "list buckets in current AWS account using non US_EAST_1 region"
in withDefaultBucket { defaultBucket =>
// Its only AWS that complains if listBuckets is called from a non
US_EAST_1 region
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
val result = for {
buckets <- S3.listBuckets().withAttributes(
S3Attributes.settings(defaultS3Settings.withS3RegionProvider(new
AwsRegionProvider {
@@ -128,7 +279,7 @@ trait S3IntegrationSpec
buckets.map(_.name) should contain(defaultBucket)
}
- it should "list with real credentials" in {
+ it should "list with real credentials" in withDefaultBucket { defaultBucket
=>
val result = S3
.listBucket(defaultBucket, None)
.withAttributes(attributes)
@@ -138,7 +289,7 @@ trait S3IntegrationSpec
listingResult.size shouldBe defaultRegionContentCount
}
- it should "list with real credentials using the Version 1 API" in {
+ it should "list with real credentials using the Version 1 API" in
withDefaultBucket { defaultBucket =>
val result = S3
.listBucket(defaultBucket, None)
.withAttributes(attributes(_.withListBucketApiVersion(ApiVersion.ListBucketVersion1)))
@@ -148,7 +299,7 @@ trait S3IntegrationSpec
listingResult.size shouldBe defaultRegionContentCount
}
- it should "list with real credentials in non us-east-1 zone" in {
+ it should "list with real credentials in non us-east-1 zone" in
withBucketWithDots { bucketWithDots =>
val result = S3
.listBucket(bucketWithDots, None)
.withAttributes(otherRegionSettingsPathStyleAccess)
@@ -157,7 +308,7 @@ trait S3IntegrationSpec
result.futureValue.size shouldBe otherRegionContentCount
}
- it should "upload with real credentials" in {
+ it should "upload with real credentials" in withDefaultBucket {
defaultBucket =>
val objectKey = "putTest"
val bytes = ByteString(objectValue)
val data = Source.single(ByteString(objectValue))
@@ -174,7 +325,7 @@ trait S3IntegrationSpec
result.futureValue.eTag should not be empty
}
- it should "upload and delete" in {
+ it should "upload and delete" in withDefaultBucket { defaultBucket =>
val objectKey = "putTest"
val bytes = ByteString(objectValue)
val data = Source.single(ByteString(objectValue))
@@ -202,7 +353,7 @@ trait S3IntegrationSpec
metaAfter shouldBe empty
}
- it should "upload multipart with real credentials" in {
+ it should "upload multipart with real credentials" in withDefaultBucket {
defaultBucket =>
val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
val result =
@@ -216,7 +367,7 @@ trait S3IntegrationSpec
multipartUploadResult.key shouldBe objectKey
}
- it should "download with real credentials" in {
+ it should "download with real credentials" in withDefaultBucket {
defaultBucket =>
val (metaFuture, bodyFuture) = S3
.getObject(defaultBucket, objectKey)
.withAttributes(attributes)
@@ -230,7 +381,7 @@ trait S3IntegrationSpec
meta.contentType shouldBe
Some(ContentTypes.`application/octet-stream`.value)
}
- it should "delete with real credentials" in {
+ it should "delete with real credentials" in withDefaultBucket {
defaultBucket =>
val delete = S3
.deleteObject(defaultBucket, objectKey)
.withAttributes(attributes)
@@ -238,7 +389,7 @@ trait S3IntegrationSpec
delete.futureValue shouldEqual pekko.Done
}
- it should "upload huge multipart with real credentials" in {
+ it should "upload huge multipart with real credentials" in withDefaultBucket
{ defaultBucket =>
val objectKey = "huge"
val hugeString = "0123456789abcdef" * 64 * 1024 * 11
val result =
@@ -253,7 +404,7 @@ trait S3IntegrationSpec
multipartUploadResult.key shouldBe objectKey
}
- it should "upload, download and delete with spaces in the key" in {
+ it should "upload, download and delete with spaces in the key" in
withDefaultBucket { defaultBucket =>
val objectKey = "test folder/test file.txt"
val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
@@ -281,7 +432,7 @@ trait S3IntegrationSpec
.futureValue shouldEqual pekko.Done
}
- it should "upload, download and delete with brackets in the key" in {
+ it should "upload, download and delete with brackets in the key" in
withDefaultBucket { defaultBucket =>
val objectKey = "abc/DEF/2017/06/15/1234 (1).TXT"
val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
@@ -309,144 +460,161 @@ trait S3IntegrationSpec
.futureValue shouldEqual pekko.Done
}
- it should "upload, download and delete with spaces in the key in non
us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
- "test folder/test file.txt")
+ it should "upload, download and delete with spaces in the key in non
us-east-1 zone" in withBucketWithDots {
+ bucketWithDots =>
+ uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+ "test folder/test file.txt")
+ }
// we want ASCII and other UTF-8 characters!
- it should "upload, download and delete with special characters in the key in
non us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
- "føldęrü/1234()[]><!? .TXT")
+ it should "upload, download and delete with special characters in the key in
non us-east-1 zone" in withBucketWithDots {
+ bucketWithDots =>
+ uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+ "føldęrü/1234()[]><!? .TXT")
+ }
- it should "upload, download and delete with `+` character in the key in non
us-east-1 zone" in uploadDownloadAndDeleteInOtherRegionCase(
- "1 + 2 = 3")
+ it should "upload, download and delete with `+` character in the key in non
us-east-1 zone" in withBucketWithDots {
+ bucketWithDots =>
+ uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots,
+ "1 + 2 = 3")
+ }
- it should "upload, copy, download the copy, and delete" in
uploadCopyDownload(
- "original/file.txt",
- "copy/file.txt")
+ it should "upload, copy, download the copy, and delete" in withDefaultBucket
{ defaultBucket =>
+ uploadCopyDownload(defaultBucket,
+ "original/file.txt",
+ "copy/file.txt")
+ }
// NOTE: MinIO currently has problems copying files with spaces.
- it should "upload, copy, download the copy, and delete with special
characters in key" in uploadCopyDownload(
- "original/føldęrü/1234()[]><!?.TXT",
- "copy/1 + 2 = 3")
-
- it should "upload 2 files with common prefix, 1 with different prefix and
delete by prefix" in {
- val sourceKey1 = "original/file1.txt"
- val sourceKey2 = "original/file2.txt"
- val sourceKey3 = "uploaded/file3.txt"
- val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
-
- val results = for {
- upload1 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey1).withAttributes(attributes))
- upload2 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey2).withAttributes(attributes))
- upload3 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey3).withAttributes(attributes))
- } yield (upload1, upload2, upload3)
-
- whenReady(results) {
- case (upload1, upload2, upload3) =>
- upload1.bucket shouldEqual defaultBucket
- upload1.key shouldEqual sourceKey1
- upload2.bucket shouldEqual defaultBucket
- upload2.key shouldEqual sourceKey2
- upload3.bucket shouldEqual defaultBucket
- upload3.key shouldEqual sourceKey3
+ it should "upload, copy, download the copy, and delete with special
characters in key" in withDefaultBucket {
+ defaultBucket =>
+ uploadCopyDownload(defaultBucket,
+ "original/føldęrü/1234()[]><!?.TXT",
+ "copy/1 + 2 = 3")
+ }
- S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- .futureValue shouldEqual pekko.Done
- val numOfKeysForPrefix =
- S3.listBucket(defaultBucket, Some("original"))
+ it should "upload 2 files with common prefix, 1 with different prefix and
delete by prefix" in withDefaultBucket {
+ defaultBucket =>
+ val sourceKey1 = "original/file1.txt"
+ val sourceKey2 = "original/file2.txt"
+ val sourceKey3 = "uploaded/file3.txt"
+ val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
+
+ val results = for {
+ upload1 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey1).withAttributes(attributes))
+ upload2 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey2).withAttributes(attributes))
+ upload3 <- source.runWith(S3.multipartUpload(defaultBucket,
sourceKey3).withAttributes(attributes))
+ } yield (upload1, upload2, upload3)
+
+ whenReady(results) {
+ case (upload1, upload2, upload3) =>
+ upload1.bucket shouldEqual defaultBucket
+ upload1.key shouldEqual sourceKey1
+ upload2.bucket shouldEqual defaultBucket
+ upload2.key shouldEqual sourceKey2
+ upload3.bucket shouldEqual defaultBucket
+ upload3.key shouldEqual sourceKey3
+
+ S3.deleteObjectsByPrefix(defaultBucket, Some("original"))
.withAttributes(attributes)
- .runFold(0)((result, _) => result + 1)
- .futureValue
- numOfKeysForPrefix shouldEqual 0
- S3.deleteObject(defaultBucket, sourceKey3)
- .withAttributes(attributes)
- .runWith(Sink.head)
- .futureValue shouldEqual pekko.Done
- }
+ .runWith(Sink.ignore)
+ .futureValue shouldEqual pekko.Done
+ val numOfKeysForPrefix =
+ S3.listBucket(defaultBucket, Some("original"))
+ .withAttributes(attributes)
+ .runFold(0)((result, _) => result + 1)
+ .futureValue
+ numOfKeysForPrefix shouldEqual 0
+ S3.deleteObject(defaultBucket, sourceKey3)
+ .withAttributes(attributes)
+ .runWith(Sink.head)
+ .futureValue shouldEqual pekko.Done
+ }
}
- it should "create multiple versions of an object and successfully clean it
with deleteBucketContents" in {
- // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- val versionKey = "test-version"
- val one = ByteString("one")
- val two = ByteString("two")
- val three = ByteString("three")
-
- val results =
- for {
- // Clean the bucket just incase there is residual data in there
- _ <- S3
- .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- _ <- S3
- .putObject(bucketWithVersioning, versionKey, Source.single(one),
one.length, s3Headers = S3Headers())
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- _ <- S3
- .putObject(bucketWithVersioning, versionKey, Source.single(two),
two.length, s3Headers = S3Headers())
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- _ <- S3
- .putObject(bucketWithVersioning, versionKey, Source.single(three),
three.length, s3Headers = S3Headers())
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- versionsBeforeDelete <- S3
- .listObjectVersions(bucketWithVersioning, None)
- .withAttributes(attributes)
- .runWith(Sink.seq)
- _ <- S3
- .deleteBucketContents(bucketWithVersioning, deleteAllVersions = true)
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- versionsAfterDelete <- S3
- .listObjectVersions(bucketWithVersioning, None)
- .withAttributes(attributes)
- .runWith(Sink.seq)
- listBucketContentsAfterDelete <- S3
- .listBucket(bucketWithVersioning, None)
- .withAttributes(attributes)
- .runWith(Sink.seq)
+ it should "create multiple versions of an object and successfully clean it
with deleteBucketContents" in withBucketWithVersioning {
+ bucketWithVersioning =>
+ // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val versionKey = "test-version"
+ val one = ByteString("one")
+ val two = ByteString("two")
+ val three = ByteString("three")
+
+ val results =
+ for {
+ // Clean the bucket just incase there is residual data in there
+ _ <- S3
+ .deleteBucketContents(bucketWithVersioning, deleteAllVersions =
true)
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ _ <- S3
+ .putObject(bucketWithVersioning, versionKey, Source.single(one),
one.length, s3Headers = S3Headers())
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ _ <- S3
+ .putObject(bucketWithVersioning, versionKey, Source.single(two),
two.length, s3Headers = S3Headers())
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ _ <- S3
+ .putObject(bucketWithVersioning, versionKey, Source.single(three),
three.length, s3Headers = S3Headers())
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ versionsBeforeDelete <- S3
+ .listObjectVersions(bucketWithVersioning, None)
+ .withAttributes(attributes)
+ .runWith(Sink.seq)
+ _ <- S3
+ .deleteBucketContents(bucketWithVersioning, deleteAllVersions =
true)
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ versionsAfterDelete <- S3
+ .listObjectVersions(bucketWithVersioning, None)
+ .withAttributes(attributes)
+ .runWith(Sink.seq)
+ listBucketContentsAfterDelete <- S3
+ .listBucket(bucketWithVersioning, None)
+ .withAttributes(attributes)
+ .runWith(Sink.seq)
- } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions },
- versionsAfterDelete.flatMap {
- case (versions, _) => versions
- }, listBucketContentsAfterDelete)
+ } yield (versionsBeforeDelete.flatMap { case (versions, _) => versions
},
+ versionsAfterDelete.flatMap {
+ case (versions, _) => versions
+ }, listBucketContentsAfterDelete)
- val (versionsBeforeDelete, versionsAfterDelete, bucketContentsAfterDelete)
= results.futureValue
+ val (versionsBeforeDelete, versionsAfterDelete,
bucketContentsAfterDelete) = results.futureValue
- versionsBeforeDelete.size shouldEqual 3
- versionsAfterDelete.size shouldEqual 0
- bucketContentsAfterDelete.size shouldEqual 0
+ versionsBeforeDelete.size shouldEqual 3
+ versionsAfterDelete.size shouldEqual 0
+ bucketContentsAfterDelete.size shouldEqual 0
}
- it should "listing object versions for a non versioned bucket should return
None for versionId" in {
- // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- val objectKey = "listObjectVersionIdTest"
- val bytes = ByteString(objectValue)
- val data = Source.single(ByteString(objectValue))
- val results = for {
- _ <- S3
- .putObject(defaultBucket,
- objectKey,
- data,
- bytes.length,
- s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
- .withAttributes(attributes)
- .runWith(Sink.ignore)
- result <- S3.listObjectVersions(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
- _ <- S3.deleteObject(defaultBucket,
objectKey).withAttributes(attributes).runWith(Sink.ignore)
- } yield result.flatMap { case (versions, _) => versions }
+ it should "listing object versions for a non versioned bucket should return
None for versionId" in withDefaultBucket {
+ defaultBucket =>
+ // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val objectKey = "listObjectVersionIdTest"
+ val bytes = ByteString(objectValue)
+ val data = Source.single(ByteString(objectValue))
+ val results = for {
+ _ <- S3
+ .putObject(defaultBucket,
+ objectKey,
+ data,
+ bytes.length,
+ s3Headers = S3Headers().withMetaHeaders(MetaHeaders(metaHeaders)))
+ .withAttributes(attributes)
+ .runWith(Sink.ignore)
+ result <- S3.listObjectVersions(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
+ _ <- S3.deleteObject(defaultBucket,
objectKey).withAttributes(attributes).runWith(Sink.ignore)
+ } yield result.flatMap { case (versions, _) => versions }
- forEvery(results.futureValue) { version =>
- version.versionId shouldEqual None
- }
+ Inspectors.forEvery(results.futureValue) { version =>
+ version.versionId shouldEqual None
+ }
}
- it should "upload 2 files, delete all files in bucket" in {
+ it should "upload 2 files, delete all files in bucket" in withDefaultBucket
{ defaultBucket =>
val sourceKey1 = "original/file1.txt"
val sourceKey2 = "original/file2.txt"
val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
@@ -550,154 +718,157 @@ trait S3IntegrationSpec
String.format("%032X", new BigInteger(1, digest.digest())).toLowerCase
}
- it should "upload 1 file slowly, cancel it and retrieve a multipart upload +
list part" in {
- // This test doesn't work on Minio since minio doesn't properly implement
this API, see
- // https://github.com/minio/minio/issues/13246
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- val sourceKey = "original/file-slow.txt"
- val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload")
+ it should "upload 1 file slowly, cancel it and retrieve a multipart upload +
list part" in withDefaultBucket {
+ defaultBucket =>
+ // This test doesn't work on Minio since minio doesn't properly
implement this API, see
+ // https://github.com/minio/minio/issues/13246
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val sourceKey = "original/file-slow.txt"
+ val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload")
- val inputData = createStringCollectionWithMinChunkSize(5)
- val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+ val inputData = createStringCollectionWithMinChunkSize(5)
+ val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
- val multiPartUpload =
- slowSource.toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(Keep.right).run()
+ val multiPartUpload =
+ slowSource.toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(Keep.right).run()
- val results = for {
- _ <- pekko.pattern.after(25.seconds)(Future {
- sharedKillSwitch.abort(AbortException)
- })
- _ <- multiPartUpload.recover {
- case AbortException => ()
- }
- incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
- uploadIds = incomplete.collect {
- case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+ val results = for {
+ _ <- pekko.pattern.after(25.seconds)(Future {
+ sharedKillSwitch.abort(AbortException)
+ })
+ _ <- multiPartUpload.recover {
+ case AbortException => ()
+ }
+ incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
+ uploadIds = incomplete.collect {
+ case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+ }
+ parts <- Future.sequence(uploadIds.map { uploadId =>
+ S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
+ })
+ // Cleanup the uploads after
+ _ <- Future.sequence(uploadIds.map { uploadId =>
+ S3.deleteUpload(defaultBucket, sourceKey, uploadId)(implicitly,
attributes)
+ })
+ } yield (uploadIds, incomplete, parts.flatten)
+
+ whenReady(results) {
+ case (uploadIds, incompleteFiles, parts) =>
+ val inputsUntilAbort = inputData.slice(0, 3)
+ incompleteFiles.exists(_.key == sourceKey) shouldBe true
+ parts.nonEmpty shouldBe true
+ uploadIds.size shouldBe 1
+ parts.size shouldBe 3
+ parts.map(_.size) shouldBe
inputsUntilAbort.map(_.utf8String.getBytes("UTF-8").length)
+ // In S3 the etag's are actually an MD5 hash of the contents of the
part so we can use this to check
+ // that the data has been uploaded correctly and in the right order,
see
+ //
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
+ parts.map(_.eTag.replaceAll("\"", "")) shouldBe
inputsUntilAbort.map(byteStringToMD5)
}
- parts <- Future.sequence(uploadIds.map { uploadId =>
- S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
- })
- // Cleanup the uploads after
- _ <- Future.sequence(uploadIds.map { uploadId =>
- S3.deleteUpload(defaultBucket, sourceKey, uploadId)(implicitly,
attributes)
- })
- } yield (uploadIds, incomplete, parts.flatten)
-
- whenReady(results) {
- case (uploadIds, incompleteFiles, parts) =>
- val inputsUntilAbort = inputData.slice(0, 3)
- incompleteFiles.exists(_.key == sourceKey) shouldBe true
- parts.nonEmpty shouldBe true
- uploadIds.size shouldBe 1
- parts.size shouldBe 3
- parts.map(_.size) shouldBe
inputsUntilAbort.map(_.utf8String.getBytes("UTF-8").length)
- // In S3 the etag's are actually an MD5 hash of the contents of the
part so we can use this to check
- // that the data has been uploaded correctly and in the right order,
see
- //
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
- parts.map(_.eTag.replaceAll("\"", "")) shouldBe
inputsUntilAbort.map(byteStringToMD5)
- }
}
- it should "upload 1 file slowly, cancel it and then resume it to complete
the upload" in {
- // This test doesn't work on Minio since minio doesn't properly implement
this API, see
- // https://github.com/minio/minio/issues/13246
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- val sourceKey = "original/file-slow-2.txt"
- val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2")
+ it should "upload 1 file slowly, cancel it and then resume it to complete
the upload" in withDefaultBucket {
+ defaultBucket =>
+ // This test doesn't work on Minio since minio doesn't properly
implement this API, see
+ // https://github.com/minio/minio/issues/13246
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val sourceKey = "original/file-slow-2.txt"
+ val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2")
- val inputData = createStringCollectionWithMinChunkSize(6)
+ val inputData = createStringCollectionWithMinChunkSize(6)
- val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+ val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
- val multiPartUpload =
- slowSource
- .toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(
- Keep.right)
- .run()
+ val multiPartUpload =
+ slowSource
+ .toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(
+ Keep.right)
+ .run()
- val results = for {
- _ <- pekko.pattern.after(25.seconds)(Future {
- sharedKillSwitch.abort(AbortException)
- })
- _ <- multiPartUpload.recover {
- case AbortException => ()
+ val results = for {
+ _ <- pekko.pattern.after(25.seconds)(Future {
+ sharedKillSwitch.abort(AbortException)
+ })
+ _ <- multiPartUpload.recover {
+ case AbortException => ()
+ }
+ incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
+
+ uploadId = incomplete.collectFirst {
+ case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+ }.get
+
+ parts <- S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
+
+ remainingData = inputData.slice(3, 6)
+ _ <- Source(remainingData)
+ .toMat(
+ S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId,
parts.map(_.toPart))
+ .withAttributes(attributes))(
+ Keep.right)
+ .run()
+
+ // This delay is here because sometimes there is a delay when you
complete a large file and its
+ // actually downloadable
+ downloaded <- pekko.pattern.after(5.seconds)(
+ S3.getObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.seq))
+
+ _ <- S3.deleteObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.head)
+ } yield downloaded
+
+ whenReady(results) { downloads =>
+ val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
+ val fullInputData = inputData.fold(ByteString.empty)(_ ++ _)
+
+ fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
}
- incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
-
- uploadId = incomplete.collectFirst {
- case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
- }.get
-
- parts <- S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
-
- remainingData = inputData.slice(3, 6)
- _ <- Source(remainingData)
- .toMat(
- S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId,
parts.map(_.toPart))
- .withAttributes(attributes))(
- Keep.right)
- .run()
-
- // This delay is here because sometimes there is a delay when you
complete a large file and its
- // actually downloadable
- downloaded <- pekko.pattern.after(5.seconds)(
- S3.getObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.seq))
-
- _ <- S3.deleteObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.head)
- } yield downloaded
-
- whenReady(results) { downloads =>
- val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
- val fullInputData = inputData.fold(ByteString.empty)(_ ++ _)
-
- fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
- }
}
- it should "upload a full file but complete it manually with
S3.completeMultipartUploadSource" in {
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- val sourceKey = "original/file-slow-3.txt"
- val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-3")
+ it should "upload a full file but complete it manually with
S3.completeMultipartUploadSource" in withDefaultBucket {
+ defaultBucket =>
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val sourceKey = "original/file-slow-3.txt"
+ val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-3")
- val inputData = createStringCollectionWithMinChunkSize(4)
+ val inputData = createStringCollectionWithMinChunkSize(4)
- val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
+ val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))
- val multiPartUpload =
- slowSource
- .toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(
- Keep.right)
- .run()
+ val multiPartUpload =
+ slowSource
+ .toMat(S3.multipartUpload(defaultBucket,
sourceKey).withAttributes(attributes))(
+ Keep.right)
+ .run()
- val results = for {
- _ <- pekko.pattern.after(25.seconds)(Future {
- sharedKillSwitch.abort(AbortException)
- })
- _ <- multiPartUpload.recover {
- case AbortException => ()
+ val results = for {
+ _ <- pekko.pattern.after(25.seconds)(Future {
+ sharedKillSwitch.abort(AbortException)
+ })
+ _ <- multiPartUpload.recover {
+ case AbortException => ()
+ }
+ incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
+
+ uploadId = incomplete.collectFirst {
+ case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
+ }.get
+
+ parts <- S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
+
+ _ <- S3.completeMultipartUpload(defaultBucket, sourceKey, uploadId,
parts.map(_.toPart))(implicitly, attributes)
+ // This delay is here because sometimes there is a delay when you
complete a large file and its
+ // actually downloadable
+ downloaded <- pekko.pattern.after(5.seconds)(
+ S3.getObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.seq))
+ _ <- S3.deleteObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.head)
+ } yield downloaded
+
+ whenReady(results) { downloads =>
+ val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
+ val fullInputData = inputData.slice(0, 3).fold(ByteString.empty)(_ ++
_)
+
+ fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
}
- incomplete <- S3.listMultipartUpload(defaultBucket,
None).withAttributes(attributes).runWith(Sink.seq)
-
- uploadId = incomplete.collectFirst {
- case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
- }.get
-
- parts <- S3.listParts(defaultBucket, sourceKey,
uploadId).withAttributes(attributes).runWith(Sink.seq)
-
- _ <- S3.completeMultipartUpload(defaultBucket, sourceKey, uploadId,
parts.map(_.toPart))(implicitly, attributes)
- // This delay is here because sometimes there is a delay when you
complete a large file and its
- // actually downloadable
- downloaded <- pekko.pattern.after(5.seconds)(
- S3.getObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.seq))
- _ <- S3.deleteObject(defaultBucket,
sourceKey).withAttributes(attributes).runWith(Sink.head)
- } yield downloaded
-
- whenReady(results) { downloads =>
- val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
- val fullInputData = inputData.slice(0, 3).fold(ByteString.empty)(_ ++ _)
-
- fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
- }
}
@tailrec
final def createStringCollectionContextWithMinChunkSizeRec(
@@ -761,7 +932,7 @@ trait S3IntegrationSpec
def createStringCollectionContextWithMinChunkSize(numberOfChunks: Int):
List[List[(ByteString, BigInt)]] =
createStringCollectionContextWithMinChunkSizeRec(numberOfChunks).toList.map(_.toList)
- it should "perform a chunked multi-part upload with the correct context" in {
+ it should "perform a chunked multi-part upload with the correct context" in
withDefaultBucket { defaultBucket =>
val sourceKey = "original/file-context-1.txt"
val inputData = createStringCollectionContextWithMinChunkSize(4)
val source = Source(inputData.flatten)
@@ -798,130 +969,141 @@ trait S3IntegrationSpec
}
it should "make a bucket with given name" in {
- implicit val attr: Attributes = attributes
- val bucketName = "samplebucket1"
+ forAll(genBucketName("samplebucket1")) { bucketName =>
+ system.log.info(s"Created bucket with name: $bucketName")
- val request: Future[Done] = S3
- .makeBucket(bucketName)
+ implicit val attr: Attributes = attributes
+ val request: Future[Done] = S3
+ .makeBucket(bucketName)
- whenReady(request) { value =>
- value shouldEqual Done
+ whenReady(request) { value =>
+ value shouldEqual Done
- S3.deleteBucket(bucketName).futureValue shouldBe Done
+ S3.deleteBucket(bucketName).futureValue shouldBe Done
+ }
}
}
- it should "throw an exception while creating a bucket with the same name in
Minio" in {
- // S3 will only throw a BucketAlreadyExists exception if the owner of the
account creating the bucket with an
- // already existing name different from the owner of the already existing
bucket. On the other hand Minio will
- // always throw this exception. Since its hard to test the S3 case of
different owners we only run this test
- // against Minio.
- assume(this.isInstanceOf[MinioS3IntegrationSpec])
- implicit val attr: Attributes = attributes
- S3.makeBucket(defaultBucket).failed.futureValue shouldBe an[S3Exception]
+ it should "throw an exception while creating a bucket with the same name in
Minio" in withDefaultBucket {
+ defaultBucket =>
+ // S3 will only throw a BucketAlreadyExists exception if the owner of
the account creating the bucket with an
+ // already existing name different from the owner of the already
existing bucket. On the other hand Minio will
+ // always throw this exception. Since its hard to test the S3 case of
different owners we only run this test
+ // against Minio.
+ assume(this.isInstanceOf[MinioS3IntegrationSpec])
+ implicit val attr: Attributes = attributes
+ S3.makeBucket(defaultBucket).failed.futureValue shouldBe an[S3Exception]
}
- it should "Do nothing while creating a bucket with the same name and owner
in AWS S3" in {
- // Due to this being a PUT request, S3 doesn't actually throw an exception
unless
- // you are NOT the owner of the already existing bucket.
- // See https://github.com/aws/aws-sdk-go/issues/1362#issuecomment-722554726
- assume(this.isInstanceOf[AWSS3IntegrationSpec])
- implicit val attr: Attributes = attributes
- S3.makeBucket(defaultBucket).futureValue shouldBe Done
+ it should "Do nothing while creating a bucket with the same name and owner
in AWS S3" in withDefaultBucket {
+ defaultBucket =>
+ // Due to this being a PUT request, S3 doesn't actually throw an
exception unless
+ // you are NOT the owner of the already existing bucket.
+ // See
https://github.com/aws/aws-sdk-go/issues/1362#issuecomment-722554726
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ implicit val attr: Attributes = attributes
+ S3.makeBucket(defaultBucket).futureValue shouldBe Done
}
it should "enable and disable versioning for a bucket" in {
// TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
assume(this.isInstanceOf[AWSS3IntegrationSpec])
- implicit val attr: Attributes = attributes
- val bucketName = "samplebucketversioning"
-
- val request = for {
- _ <- S3.makeBucket(bucketName)
- firstResult <- S3.getBucketVersioning(bucketName)
- _ <- S3.putBucketVersioning(bucketName,
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
- secondResult <- S3.getBucketVersioning(bucketName)
- } yield (firstResult, secondResult)
-
- whenReady(request) { case (firstValue, secondValue) =>
- firstValue shouldEqual BucketVersioningResult()
- firstValue.bucketVersioningEnabled shouldEqual false
- secondValue shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
- secondValue.bucketVersioningEnabled shouldEqual true
- S3.putBucketVersioning(bucketName,
-
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
- S3.deleteBucket(bucketName).futureValue
+ forAll(genBucketName("samplebucket1")) { bucketName =>
+ implicit val attr: Attributes = attributes
+
+ val request = for {
+ _ <- S3.makeBucket(bucketName)
+ firstResult <- S3.getBucketVersioning(bucketName)
+ _ <- S3.putBucketVersioning(bucketName,
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
+ secondResult <- S3.getBucketVersioning(bucketName)
+ } yield (firstResult, secondResult)
+
+ whenReady(request) { case (firstValue, secondValue) =>
+ firstValue shouldEqual BucketVersioningResult()
+ firstValue.bucketVersioningEnabled shouldEqual false
+ secondValue shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
+ secondValue.bucketVersioningEnabled shouldEqual true
+ S3.putBucketVersioning(bucketName,
+
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
+ S3.deleteBucket(bucketName).futureValue
+ }
}
}
it should "enable and disable versioning for a bucket with MFA delete
configured to false" in {
// TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
assume(this.isInstanceOf[AWSS3IntegrationSpec])
- implicit val attr: Attributes = attributes
- val bucketName = "samplebucketversioningmfadeletefalse"
-
- val request = for {
- _ <- S3.makeBucket(bucketName)
- _ <- S3.putBucketVersioning(bucketName,
- BucketVersioning()
- .withStatus(BucketVersioningStatus.Enabled)
- .withMfaDelete(MFAStatus.Disabled))
- result <- S3.getBucketVersioning(bucketName)
- } yield result
-
- whenReady(request) { value =>
- value shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
- S3.putBucketVersioning(bucketName,
-
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
- S3.deleteBucket(bucketName).futureValue
+ forAll(genBucketName("samplebucketversioningmfadeletefalse")) { bucketName
=>
+ implicit val attr: Attributes = attributes
+
+ val request = for {
+ _ <- S3.makeBucket(bucketName)
+ _ <- S3.putBucketVersioning(bucketName,
+ BucketVersioning()
+ .withStatus(BucketVersioningStatus.Enabled)
+ .withMfaDelete(MFAStatus.Disabled))
+ result <- S3.getBucketVersioning(bucketName)
+ } yield result
+
+ whenReady(request) { value =>
+ value shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
+ S3.putBucketVersioning(bucketName,
+
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
+ S3.deleteBucket(bucketName).futureValue
+ }
}
}
it should "create and delete bucket with a given name" in {
- val bucketName = "samplebucket3"
+ forAll(genBucketName("samplebucket3")) { bucketName =>
+ system.log.info(s"Created bucket with name: $bucketName")
- val makeRequest: Source[Done, NotUsed] = S3
- .makeBucketSource(bucketName)
- .withAttributes(attributes)
- val deleteRequest: Source[Done, NotUsed] = S3
- .deleteBucketSource(bucketName)
- .withAttributes(attributes)
+ val makeRequest: Source[Done, NotUsed] = S3
+ .makeBucketSource(bucketName)
+ .withAttributes(attributes)
+ val deleteRequest: Source[Done, NotUsed] = S3
+ .deleteBucketSource(bucketName)
+ .withAttributes(attributes)
- val request = for {
- make <- makeRequest.runWith(Sink.ignore)
- delete <- deleteRequest.runWith(Sink.ignore)
- } yield (make, delete)
+ val request = for {
+ make <- makeRequest.runWith(Sink.ignore)
+ delete <- deleteRequest.runWith(Sink.ignore)
+ } yield (make, delete)
- request.futureValue should equal((Done, Done))
+ request.futureValue should equal((Done, Done))
+ }
}
it should "create a bucket in the non default us-east-1 region" in {
- val bucketName = "samplebucketotherregion"
+ forAll(genBucketName("samplebucketotherregion")) { bucketName =>
+ system.log.info(s"Created bucket with name: $bucketName")
- val request = for {
- _ <- S3
- .makeBucketSource(bucketName)
- .withAttributes(otherRegionSettingsPathStyleAccess)
- .runWith(Sink.head)
- result <- S3
- .checkIfBucketExistsSource(bucketName)
- .withAttributes(otherRegionSettingsPathStyleAccess)
- .runWith(Sink.head)
- _ <- S3
- .deleteBucketSource(bucketName)
- .withAttributes(otherRegionSettingsPathStyleAccess)
- .runWith(Sink.head)
- } yield result
+ val request = for {
+ _ <- S3
+ .makeBucketSource(bucketName)
+ .withAttributes(otherRegionSettingsPathStyleAccess)
+ .runWith(Sink.head)
+ result <- S3
+ .checkIfBucketExistsSource(bucketName)
+ .withAttributes(otherRegionSettingsPathStyleAccess)
+ .runWith(Sink.head)
+ _ <- S3
+ .deleteBucketSource(bucketName)
+ .withAttributes(otherRegionSettingsPathStyleAccess)
+ .runWith(Sink.head)
+ } yield result
- request.futureValue shouldEqual AccessGranted
+ request.futureValue shouldEqual AccessGranted
+ }
}
- it should "throw an exception while deleting bucket that doesn't exist" in {
- implicit val attr: Attributes = attributes
- S3.deleteBucket(nonExistingBucket).failed.futureValue shouldBe
an[S3Exception]
+ it should "throw an exception while deleting bucket that doesn't exist" in
withNonExistingBucket {
+ nonExistingBucket =>
+ implicit val attr: Attributes = attributes
+ S3.deleteBucket(nonExistingBucket).failed.futureValue shouldBe
an[S3Exception]
}
- it should "check if bucket exists" in {
+ it should "check if bucket exists" in withDefaultBucket { defaultBucket =>
implicit val attr: Attributes = attributes
val checkIfBucketExits: Future[BucketAccess] =
S3.checkIfBucketExists(defaultBucket)
@@ -930,7 +1112,7 @@ trait S3IntegrationSpec
}
}
- it should "check for non-existing bucket" in {
+ it should "check for non-existing bucket" in withNonExistingBucket {
nonExistingBucket =>
implicit val attr: Attributes = attributes
val request: Future[BucketAccess] =
S3.checkIfBucketExists(nonExistingBucket)
@@ -939,7 +1121,7 @@ trait S3IntegrationSpec
}
}
- it should "contain error code even if exception in empty" in {
+ it should "contain error code even if exception in empty" in
withDefaultBucket { defaultBucket =>
val exception =
S3.getObjectMetadata(defaultBucket, "sample")
.withAttributes(invalidCredentials)
@@ -953,24 +1135,26 @@ trait S3IntegrationSpec
private val chunk: ByteString =
ByteString.fromArray(Array.fill(S3.MinChunkSize)(0.toByte))
- it should "only upload single chunk when size of the ByteString equals chunk
size" in {
- val source: Source[ByteString, Any] = Source.single(chunk)
- uploadAndAndCheckParts(source, 1)
+ it should "only upload single chunk when size of the ByteString equals chunk
size" in withDefaultBucket {
+ defaultBucket =>
+ val source: Source[ByteString, Any] = Source.single(chunk)
+ uploadAndAndCheckParts(defaultBucket, source, 1)
}
- it should "only upload single chunk when exact chunk is followed by an empty
ByteString" in {
- val source: Source[ByteString, Any] = Source[ByteString](
- chunk :: ByteString.empty :: Nil)
+ it should "only upload single chunk when exact chunk is followed by an empty
ByteString" in withDefaultBucket {
+ defaultBucket =>
+ val source: Source[ByteString, Any] = Source[ByteString](
+ chunk :: ByteString.empty :: Nil)
- uploadAndAndCheckParts(source, 1)
+ uploadAndAndCheckParts(defaultBucket, source, 1)
}
- it should "upload two chunks size of ByteStrings equals chunk size" in {
+ it should "upload two chunks size of ByteStrings equals chunk size" in
withDefaultBucket { defaultBucket =>
val source: Source[ByteString, Any] = Source(chunk :: chunk :: Nil)
- uploadAndAndCheckParts(source, 2)
+ uploadAndAndCheckParts(defaultBucket, source, 2)
}
- it should "upload empty source" in {
+ it should "upload empty source" in withDefaultBucket { defaultBucket =>
val upload =
for {
upload <- Source
@@ -984,7 +1168,8 @@ trait S3IntegrationSpec
upload.futureValue.eTag should not be empty
}
- private def uploadAndAndCheckParts(source: Source[ByteString, _],
expectedParts: Int): Assertion = {
+ private def uploadAndAndCheckParts(
+ defaultBucket: String, source: Source[ByteString, _], expectedParts:
Int): Assertion = {
val metadata =
for {
_ <- source.runWith(
@@ -1001,7 +1186,7 @@ trait S3IntegrationSpec
etag.substring(etag.indexOf('-') + 1).toInt shouldBe expectedParts
}
- private def uploadDownloadAndDeleteInOtherRegionCase(objectKey: String):
Assertion = {
+ private def uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots: String,
objectKey: String): Assertion = {
val source: Source[ByteString, Any] = Source(ByteString(objectValue) ::
Nil)
val results = for {
@@ -1028,7 +1213,7 @@ trait S3IntegrationSpec
.futureValue shouldEqual pekko.Done
}
- private def uploadCopyDownload(sourceKey: String, targetKey: String):
Assertion = {
+ private def uploadCopyDownload(defaultBucket: String, sourceKey: String,
targetKey: String): Assertion = {
val source: Source[ByteString, Any] =
Source.single(ByteString(objectValue))
val results = for {
@@ -1065,21 +1250,39 @@ trait S3IntegrationSpec
}
/*
- * This is an integration test and ignored by default
- *
- * For running the tests you need to create 2 buckets:
- * - one in region us-east-1
- * - one in an other region (eg eu-central-1)
- * Update the bucket name and regions in the code below
+ * This is an integration test. In order for the test suite to run make sure
you have the necessary
+ * permissions to create/delete buckets and objects in different regions.
*
* Set your keys aws access-key-id and secret-access-key in
src/test/resources/application.conf
*
- * Comment @ignore and run the tests
- * (tests that do listing counts might need some tweaking)
+ * Since this a test that is marked with `@DoNotDiscover` it will not run as
part of the test suite unless its
+ * explicitly executed with
+ * `sbt "s3/Test/runMain org.scalatest.tools.Runner -o -s
org.apache.pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec"`
*
+ * See https://www.scalatest.org/user_guide/using_the_runner for more details
about the runner.
+ *
+ * Running tests directly via IDE's such as Intellij is also supported
*/
-@Ignore
-class AWSS3IntegrationSpec extends
TestKit(ActorSystem("AWSS3IntegrationSpec")) with S3IntegrationSpec
+@DoNotDiscover
+class AWSS3IntegrationSpec extends
TestKit(ActorSystem("AWSS3IntegrationSpec")) with S3IntegrationSpec {
+ // Its recommended when testing against S3 to use a specific prefix in order
to identify where the buckets
+ // are coming from since typically S3 accounts can also in other contexts
+ override lazy val bucketPrefix: Option[String] =
+
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.bucketPrefix").orElse(
+ Some("pekko-connectors-"))
+
+ override lazy val enableCleanup: Option[FiniteDuration] =
+
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.enableCleanup").map(
+ Duration.apply).collect {
+ case finiteDuration: FiniteDuration => finiteDuration
+ }.orElse(Some(1.minute))
+
+ // Since S3 accounts share global state, we should randomly generate bucket
names so concurrent tests
+ // against an S3 account don't conflict with eachother
+ override lazy val randomlyGenerateBucketNames: Boolean =
+
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.randomlyGenerateBucketNames")
+ .map(_.toBoolean).getOrElse(true)
+}
/*
* For this test, you need a have docker installed and running.
@@ -1093,6 +1296,10 @@ class MinioS3IntegrationSpec
with S3IntegrationSpec
with MinioS3Test {
+ // Since a unique new Minio container is started with each test run there is
no point in making random
+ // bucket names
+ override lazy val randomlyGenerateBucketNames: Boolean = false
+
override lazy val defaultS3Settings: S3Settings = s3Settings
.withS3RegionProvider(
new AwsRegionProvider {
@@ -1100,27 +1307,6 @@ class MinioS3IntegrationSpec
})
.withAccessStyle(PathAccessStyle)
- override def beforeAll(): Unit = {
- super.beforeAll()
-
- val makeBuckets = for {
- _ <- S3.makeBucket(bucketWithDots)(
- system,
- attributes(
- _.withS3RegionProvider(
- new AwsRegionProvider {
- val getRegion: Region = Region.EU_CENTRAL_1
- })))
- _ <- S3.makeBucket(defaultBucket)(
- system,
- attributes)
- _ <- S3.makeBucket(bucketWithVersioning)(
- system,
- attributes)
- } yield ()
- Await.result(makeBuckets, 10.seconds)
- }
-
it should "properly set the endpointUrl using VirtualHostAccessStyle" in {
s3Settings
.withAccessStyle(VirtualHostAccessStyle)
@@ -1129,3 +1315,10 @@ class MinioS3IntegrationSpec
.value shouldEqual container.getVirtualHost
}
}
+
+object S3IntegrationSpec {
+ val BucketWithDots = "my.test.frankfurt"
+ val DefaultBucket = "my-test-us-east-1"
+ val BucketWithVersioning = "my-bucket-with-versioning"
+ val NonExistingBucket = "nowhere"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]