This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new ae4567ec2 Add S3 put plus get bucketWithVersioning API
ae4567ec2 is described below
commit ae4567ec2538c050f62a535acedf27b6f449a269
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu Apr 20 17:56:59 2023 +0200
Add S3 put plus get bucketWithVersioning API
---
.../stream/connectors/s3/impl/HttpRequests.scala | 51 ++++-
.../stream/connectors/s3/impl/Marshalling.scala | 13 ++
.../stream/connectors/s3/impl/S3Request.scala | 10 +
.../pekko/stream/connectors/s3/impl/S3Stream.scala | 41 ++++
.../pekko/stream/connectors/s3/javadsl/S3.scala | 120 ++++++++++++
.../apache/pekko/stream/connectors/s3/model.scala | 213 +++++++++++++++++++++
.../pekko/stream/connectors/s3/scaladsl/S3.scala | 99 ++++++++++
.../connectors/s3/impl/HttpRequestsSpec.scala | 15 ++
.../connectors/s3/scaladsl/S3IntegrationSpec.scala | 45 +++++
9 files changed, 606 insertions(+), 1 deletion(-)
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
index 3418dab83..b9d92d399 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
@@ -24,13 +24,21 @@ import pekko.http.scaladsl.model.Uri.{ Authority, Query }
import pekko.http.scaladsl.model.headers.{ `Raw-Request-URI`, Host, RawHeader }
import pekko.http.scaladsl.model.{ RequestEntity, _ }
import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle,
VirtualHostAccessStyle }
-import pekko.stream.connectors.s3.{ ApiVersion, MultipartUpload, S3Settings }
+import pekko.stream.connectors.s3.{
+ ApiVersion,
+ BucketVersioning,
+ BucketVersioningStatus,
+ MFAStatus,
+ MultipartUpload,
+ S3Settings
+}
import pekko.stream.scaladsl.Source
import pekko.util.ByteString
import software.amazon.awssdk.regions.Region
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
+import scala.xml.NodeSeq
/**
* Internal Api
@@ -178,6 +186,47 @@ import scala.concurrent.{ ExecutionContext, Future }
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)
+ def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus],
method: HttpMethod,
+ headers: Seq[HttpHeader] = Nil)(
+ implicit conf: S3Settings): HttpRequest = {
+
+ val confWithVirtualHost = conf.withAccessStyle(VirtualHostAccessStyle)
+ val authority = requestAuthority(bucket,
conf.s3RegionProvider.getRegion)(confWithVirtualHost)
+
+ val finalHeaders = (mfaStatus, method) match {
+ case (Some(mfaEnabled: MFAStatus.Enabled), HttpMethods.PUT) =>
+ RawHeader("x-amz-mfa", s"${mfaEnabled.mfa.serialNumber}
${mfaEnabled.mfa.tokenCode}") +: headers
+ case _ => headers
+ }
+
+ HttpRequest(method)
+ .withHeaders(Host(authority) +: finalHeaders)
+ .withUri(requestUri(bucket,
None)(confWithVirtualHost).withAuthority(authority).withQuery(Query("versioning")))
+ }
+
+ def putBucketVersioningPayload(bucketVersioning: BucketVersioning)(
+ implicit ec: ExecutionContext): Future[RequestEntity] = {
+
+ val status = bucketVersioning.status.map {
+ case BucketVersioningStatus.Enabled => <Status>Enabled</Status>
+ case BucketVersioningStatus.Suspended => <Status>Suspended</Status>
+ }.getOrElse(NodeSeq.Empty)
+
+ val mfaDelete = bucketVersioning.mfaDelete.map {
+ case _: MFAStatus.Enabled => <MfaDelete>Enabled</MfaDelete>
+ case MFAStatus.Disabled => <MfaDelete>Disabled</MfaDelete>
+ }.getOrElse(NodeSeq.Empty)
+
+ // @formatter:off
+ val payload = <VersioningConfiguration
xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ {status}
+ {mfaDelete}
+ </VersioningConfiguration>
+ // @formatter:on
+
+ Marshal(payload).to[RequestEntity]
+ }
+
def uploadManagementRequest(
s3Location: S3Location,
uploadId: String,
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
index 1fd5a75d3..de168389e 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
@@ -32,6 +32,19 @@ import scala.xml.NodeSeq
@InternalApi private[impl] object Marshalling {
import ScalaXmlSupport._
+ implicit val bucketVersioningUnmarshaller:
FromEntityUnmarshaller[BucketVersioningResult] = {
+ nodeSeqUnmarshaller(MediaTypes.`application/xml`,
ContentTypes.`application/octet-stream`).map {
+ case NodeSeq.Empty => throw Unmarshaller.NoContentException
+ case x =>
+ val status = (x \ "Status").headOption.map(_.text match {
+ case "Enabled" => BucketVersioningStatus.Enabled
+ case "Suspended" => BucketVersioningStatus.Suspended
+ })
+ val MFADelete = (x \ "MfaDelete").headOption.map(_.exists(_.text ==
"Enabled"))
+ BucketVersioningResult(status, MFADelete)
+ }
+ }
+
implicit val multipartUploadUnmarshaller:
FromEntityUnmarshaller[MultipartUpload] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml`,
ContentTypes.`application/octet-stream`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
index ccd8407f9..bc9c5d6db 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
@@ -74,3 +74,13 @@ import org.apache.pekko.annotation.InternalApi
* Internal Api
*/
@InternalApi private[s3] case object CheckBucket extends S3Request
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object PutBucketVersioning extends S3Request
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object GetBucketVersioning extends S3Request
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 956d86bb0..e03fbbb80 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -779,6 +779,47 @@ import scala.util.{ Failure, Success, Try }
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId,
headers).withAttributes(attr).runWith(Sink.ignore)
+ private def bucketVersioningRequest(bucket: String, mfaStatus:
Option[MFAStatus], headers: S3Headers)(
+ method: HttpMethod,
+ conf: S3Settings): HttpRequest =
+ HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method,
headers.headers)(conf)
+
+ def putBucketVersioningSource(
+ bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers):
Source[Done, NotUsed] =
+ s3ManagementRequest[Done](
+ bucket = bucket,
+ method = HttpMethods.PUT,
+ httpRequest = bucketVersioningRequest(bucket,
bucketVersioning.mfaDelete, headers),
+ headers.headersFor(PutBucketVersioning),
+ process = processS3LifecycleResponse,
+ httpEntity =
Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))
+
+ def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning,
headers: S3Headers)(
+ implicit mat: Materializer,
+ attr: Attributes): Future[Done] =
+ putBucketVersioningSource(bucket, bucketVersioning,
headers).withAttributes(attr).runWith(Sink.ignore)
+
+ def getBucketVersioningSource(
+ bucket: String, headers: S3Headers): Source[BucketVersioningResult,
NotUsed] =
+ s3ManagementRequest[BucketVersioningResult](
+ bucket = bucket,
+ method = HttpMethods.GET,
+ httpRequest = bucketVersioningRequest(bucket, None, headers),
+ headers.headersFor(GetBucketVersioning),
+ process = { (response: HttpResponse, mat: Materializer) =>
+ response match {
+ case HttpResponse(status, _, entity, _) if status.isSuccess() =>
+ Unmarshal(entity).to[BucketVersioningResult](implicitly,
ExecutionContexts.parasitic, mat)
+ case response: HttpResponse =>
+ unmarshalError(response.status, response.entity)(mat)
+ }
+ })
+
+ def getBucketVersioning(bucket: String, headers: S3Headers)(
+ implicit mat: Materializer,
+ attr: Attributes): Future[BucketVersioningResult] =
+ getBucketVersioningSource(bucket,
headers).withAttributes(attr).runWith(Sink.head)
+
private def s3ManagementRequest[T](
bucket: String,
method: HttpMethod,
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
index 7de398111..ee649545e 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
@@ -1633,4 +1633,124 @@ object S3 {
private def func[T, R](f: T => R) = new pekko.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type
[[Done]] as API doesn't return any additional information
+ */
+ def putBucketVersioning(bucketName: String, bucketVersioning:
BucketVersioning)(
+ implicit system: ClassicActorSystemProvider,
+ attributes: Attributes = Attributes()): CompletionStage[Done] =
+ putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @param s3Headers any headers you want to add
+ * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type
[[Done]] as API doesn't return any additional information
+ */
+ def putBucketVersioning(
+ bucketName: String,
+ bucketVersioning: BucketVersioning,
+ s3Headers: S3Headers)(
+ implicit system: ClassicActorSystemProvider, attributes: Attributes):
CompletionStage[Done] =
+ S3Stream
+ .putBucketVersioning(bucketName, bucketVersioning,
s3Headers)(SystemMaterializer(system).materializer, attributes)
+ .toJava
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioningSource(bucketName: String, bucketVersioning:
BucketVersioning): Source[Done, NotUsed] =
+ putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @param s3Headers any headers you want to add
+ * @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioningSource(bucketName: String,
+ bucketVersioning: BucketVersioning,
+ s3Headers: S3Headers): Source[Done, NotUsed] =
+ S3Stream.putBucketVersioningSource(bucketName, bucketVersioning,
s3Headers).asJava
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param system the actor system which provides the materializer to run
with
+ * @param attributes attributes to run request with
+ * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioning(bucketName: String,
+ system: ClassicActorSystemProvider,
+ attributes: Attributes): CompletionStage[BucketVersioningResult] =
+ getBucketVersioning(bucketName, system, attributes, S3Headers.empty)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param system the actor system which provides the materializer to run
with
+ * @param attributes attributes to run request with
+ * @param s3Headers any headers you want to add
+ * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioning(bucketName: String,
+ system: ClassicActorSystemProvider,
+ attributes: Attributes,
+ s3Headers: S3Headers): CompletionStage[BucketVersioningResult] =
+ S3Stream.getBucketVersioning(bucketName,
s3Headers)(SystemMaterializer(system).materializer, attributes).toJava
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param system the actor system which provides the materializer to run
with
+ * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioning(
+ bucketName: String, system: ClassicActorSystemProvider):
CompletionStage[BucketVersioningResult] =
+ getBucketVersioning(bucketName, system, Attributes(), S3Headers.empty)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @return [[pekko.stream.javadsl.Source Source]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioningSource(bucketName: String):
Source[BucketVersioningResult, NotUsed] =
+ getBucketVersioningSource(bucketName, S3Headers.empty)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param s3Headers any headers you want to add
+ * @return [[pekko.stream.javadsl.Source Source]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers):
Source[BucketVersioningResult, NotUsed] =
+ S3Stream.getBucketVersioningSource(bucketName, s3Headers).asJava
+
}
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
index ecd177e22..d4c522846 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
@@ -26,6 +26,219 @@ import scala.collection.immutable.Seq
import scala.collection.immutable
import scala.compat.java8.OptionConverters._
+final class MFA private (val serialNumber: String, val tokenCode: String) {
+
+ /** Java API */
+ def getSerialNumber: String = serialNumber
+
+ /** Java API */
+ def getTokenCode: String = tokenCode
+
+ def withSerialNumber(value: String): MFA = copy(serialNumber = value)
+
+ def withTokenCode(value: String): MFA = copy(tokenCode = value)
+
+ private def copy(serialNumber: String = serialNumber, tokenCode: String =
tokenCode): MFA =
+ new MFA(
+ serialNumber,
+ tokenCode)
+
+ override def toString: String =
+ "MFA(" +
+ s"serialNumber=$serialNumber," +
+ s"tokenCode=$tokenCode" +
+ ")"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MFA =>
+ Objects.equals(this.serialNumber, that.serialNumber) &&
+ Objects.equals(this.tokenCode, that.tokenCode)
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ Objects.hash(this.serialNumber, this.tokenCode)
+}
+
+object MFA {
+
+ /** Scala API */
+ def apply(serialNumber: String, tokenCode: String): MFA =
+ new MFA(serialNumber, tokenCode)
+
+ /** Java API */
+ def create(serialNumber: String, tokenCode: String): MFA =
+ apply(serialNumber, tokenCode)
+}
+
+sealed trait MFAStatus
+
+object MFAStatus {
+ final class Enabled private (val mfa: MFA) extends MFAStatus {
+
+ /** Java API */
+ def getMfa: MFA = mfa
+
+ // Warning is only being generated here because there is a single argument
in the parameter list. If more fields
+ // get added to CommonPrefixes then the `@nowarn` is no longer needed
+ @nowarn
+ def withMfa(value: MFA): Enabled = copy(mfa = value)
+
+ private def copy(mfa: MFA): Enabled = new Enabled(mfa)
+
+ override def toString: String =
+ "Enabled(" +
+ s"mfa=$mfa" +
+ ")"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: Enabled =>
+ Objects.equals(this.mfa, that.mfa)
+ case _ => false
+ }
+
+ override def hashCode(): Int = Objects.hash(this.mfa)
+
+ }
+
+ object Enabled {
+
+ /** Scala API */
+ def apply(mfa: MFA): Enabled = new Enabled(mfa)
+
+ /** Java API */
+ def create(mfa: MFA): Enabled = apply(mfa)
+ }
+
+ case object Disabled extends MFAStatus
+
+ /** Java API */
+ val disabled = Disabled
+
+}
+
+sealed trait BucketVersioningStatus
+
+object BucketVersioningStatus {
+ case object Enabled extends BucketVersioningStatus
+
+ case object Suspended extends BucketVersioningStatus
+
+ /** Java API */
+ val enabled = Enabled
+
+ /** Java API */
+ val suspended = Suspended
+}
+
+final class BucketVersioningResult private (val status:
Option[BucketVersioningStatus],
+ val mfaDelete: Option[Boolean]) {
+
+ /** Java API */
+ def getStatus: Option[BucketVersioningStatus] = status
+
+ /** Java API */
+ def getMfaDelete: java.util.Optional[Boolean] = mfaDelete.asJava
+
+ def withStatus(value: BucketVersioningStatus): BucketVersioningResult =
+ copy(status = Some(value))
+
+ def withMfaDelete(value: Boolean): BucketVersioningResult =
+ copy(mfaDelete = Some(value))
+
+ private def copy(
+ status: Option[BucketVersioningStatus] = status, mfaDelete:
Option[Boolean] = mfaDelete): BucketVersioningResult =
+ new BucketVersioningResult(
+ status,
+ mfaDelete)
+
+ override def toString: String =
+ "BucketVersioningResult(" +
+ s"status=$status," +
+ s"mfaDelete=$mfaDelete" +
+ ")"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: BucketVersioningResult =>
+ Objects.equals(this.status, that.status) &&
+ Objects.equals(this.mfaDelete, that.mfaDelete)
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ Objects.hash(this.status, this.mfaDelete)
+}
+
+object BucketVersioningResult {
+
+ def apply(): BucketVersioningResult = apply(None, None)
+
+ /** Scala API */
+ def apply(status: Option[BucketVersioningStatus], mfaDelete:
Option[Boolean]): BucketVersioningResult =
+ new BucketVersioningResult(status, mfaDelete)
+
+ /** Java API */
+ def create(): BucketVersioningResult = apply()
+
+ def create(status: java.util.Optional[BucketVersioningStatus], mfaDelete:
java.util.Optional[Boolean])
+ : BucketVersioningResult =
+ apply(status.asScala, mfaDelete.asScala)
+
+}
+
+final class BucketVersioning private (val status:
Option[BucketVersioningStatus], val mfaDelete: Option[MFAStatus]) {
+
+ /** Java API */
+ def getStatus: java.util.Optional[BucketVersioningStatus] = status.asJava
+
+ /** Java API */
+ def getMfaDelete: java.util.Optional[MFAStatus] = mfaDelete.asJava
+
+ def withStatus(value: BucketVersioningStatus): BucketVersioning =
copy(status = Some(value))
+
+ def withMfaDelete(value: MFAStatus): BucketVersioning = copy(mfaDelete =
Some(value))
+
+ private def copy(
+ status: Option[BucketVersioningStatus] = status, mfaDelete:
Option[MFAStatus] = mfaDelete): BucketVersioning =
+ new BucketVersioning(
+ status,
+ mfaDelete)
+
+ override def toString: String =
+ "BucketVersioning(" +
+ s"status=$status," +
+ s"mfaDelete=$mfaDelete" +
+ ")"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: BucketVersioning =>
+ Objects.equals(this.status, that.status) &&
+ Objects.equals(this.mfaDelete, that.mfaDelete)
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ Objects.hash(this.status, this.mfaDelete)
+}
+
+object BucketVersioning {
+
+ /** Scala API */
+ def apply(): BucketVersioning = apply(None, None)
+
+ /** Scala API */
+ def apply(status: Option[BucketVersioningStatus], mfaDelete:
Option[MFAStatus]): BucketVersioning =
+ new BucketVersioning(status, mfaDelete)
+
+ /** Java API */
+ def create(): BucketVersioning = apply()
+
+ def create(
+ status: java.util.Optional[BucketVersioningStatus], mfaDelete:
java.util.Optional[MFAStatus]): BucketVersioning =
+ apply(status.asScala, mfaDelete.asScala)
+
+}
+
final class MultipartUpload private (val bucket: String, val key: String, val
uploadId: String) {
/** Java API */
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
index 03a11fcb4..fe53fce1d 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
@@ -1092,4 +1092,103 @@ object S3 {
uploadId: String,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream.deleteUploadSource(bucketName, key, uploadId, s3Headers)
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @return [[scala.concurrent.Future Future]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioning(bucketName: String, bucketVersioning:
BucketVersioning)(
+ implicit system: ClassicActorSystemProvider,
+ attributes: Attributes = Attributes()): Future[Done] =
+ putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @param s3Headers any headers you want to add
+ * @return [[scala.concurrent.Future Future]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioning(
+ bucketName: String,
+ bucketVersioning: BucketVersioning,
+ s3Headers: S3Headers)(implicit system: ClassicActorSystemProvider,
attributes: Attributes): Future[Done] =
+ S3Stream.putBucketVersioning(bucketName, bucketVersioning, s3Headers)
+
+ /**
+ * Delete all existing parts for a specific upload
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @return [[pekko.stream.scaladsl.Source Source]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioningSource(bucketName: String, bucketVersioning:
BucketVersioning): Source[Done, NotUsed] =
+ putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)
+
+ /**
+ * Sets the versioning state of an existing bucket.
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param bucketVersioning The state that you want to update
+ * @param s3Headers any headers you want to add
+ * @return [[pekko.stream.scaladsl.Source Source]] of type [[Done]] as API
doesn't return any additional information
+ */
+ def putBucketVersioningSource(bucketName: String,
+ bucketVersioning: BucketVersioning,
+ s3Headers: S3Headers): Source[Done, NotUsed] =
+ S3Stream.putBucketVersioningSource(bucketName, bucketVersioning, s3Headers)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @return [[scala.concurrent.Future Future]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioning(bucketName: String)(implicit system:
ClassicActorSystemProvider,
+ attributes: Attributes = Attributes()): Future[BucketVersioningResult] =
+ S3Stream.getBucketVersioning(bucketName, S3Headers.empty)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param s3Headers any headers you want to add
+ * @return [[scala.concurrent.Future Future]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioning(
+ bucketName: String,
+ s3Headers: S3Headers)(
+ implicit system: ClassicActorSystemProvider, attributes: Attributes):
Future[BucketVersioningResult] =
+ S3Stream.getBucketVersioning(bucketName, s3Headers)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @return [[pekko.stream.scaladsl.Source Source]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioningSource(bucketName: String):
Source[BucketVersioningResult, NotUsed] =
+ getBucketVersioningSource(bucketName, S3Headers.empty)
+
+ /**
+ * Gets the versioning of an existing bucket
+ *
+ * @see
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+ * @param bucketName Bucket name
+ * @param s3Headers any headers you want to add
+ * @return [[pekko.stream.scaladsl.Source Source]] of type
[[BucketVersioningResult]]
+ */
+ def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers):
Source[BucketVersioningResult, NotUsed] =
+ S3Stream.getBucketVersioningSource(bucketName, s3Headers)
}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
index 558eb2e52..7c7564b1e 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
@@ -500,4 +500,19 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers
with ScalaFutures with
request.uri.queryString() should equal(None)
request.method should equal(HttpMethods.HEAD)
}
+
+ it should "add x-amz-mfa headers for a putBucketVersioning request" in {
+ implicit val settings: S3Settings = getSettings()
+
+ val serialNumber = "serial-number"
+ val tokenCode = "token-code"
+
+ val request: HttpRequest =
HttpRequests.bucketVersioningRequest("target-bucket",
+ Some(MFAStatus.Enabled(MFA(serialNumber, tokenCode))),
+ HttpMethods.PUT)
+
+ request.headers.collectFirst {
+ case httpHeader if httpHeader.is("x-amz-mfa") => httpHeader.value()
+ } should equal(Some(s"$serialNumber $tokenCode"))
+ }
}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 21259e275..dffa40cc7 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -828,6 +828,51 @@ trait S3IntegrationSpec
S3.makeBucket(defaultBucket).futureValue shouldBe Done
}
+ it should "enable and disable versioning for a bucket" in {
+ // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ implicit val attr: Attributes = attributes
+ val bucketName = "samplebucketversioning"
+
+ val request = for {
+ _ <- S3.makeBucket(bucketName)
+ firstResult <- S3.getBucketVersioning(bucketName)
+ _ <- S3.putBucketVersioning(bucketName,
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
+ secondResult <- S3.getBucketVersioning(bucketName)
+ } yield (firstResult, secondResult)
+
+ whenReady(request) { case (firstValue, secondValue) =>
+ firstValue shouldEqual BucketVersioningResult()
+ secondValue shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
+ S3.putBucketVersioning(bucketName,
+
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
+ S3.deleteBucket(bucketName).futureValue
+ }
+ }
+
+ it should "enable and disable versioning for a bucket with MFA delete
configured to false" in {
+ // TODO: Figure out a way to properly test this with Minio, see
https://github.com/akka/alpakka/issues/2750
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ implicit val attr: Attributes = attributes
+ val bucketName = "samplebucketversioningmfadeletefalse"
+
+ val request = for {
+ _ <- S3.makeBucket(bucketName)
+ _ <- S3.putBucketVersioning(bucketName,
+ BucketVersioning()
+ .withStatus(BucketVersioningStatus.Enabled)
+ .withMfaDelete(MFAStatus.Disabled))
+ result <- S3.getBucketVersioning(bucketName)
+ } yield result
+
+ whenReady(request) { value =>
+ value shouldEqual
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
+ S3.putBucketVersioning(bucketName,
+
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue
shouldBe Done
+ S3.deleteBucket(bucketName).futureValue
+ }
+ }
+
it should "create and delete bucket with a given name" in {
val bucketName = "samplebucket3"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]