This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new ae4567ec2 Add S3 put plus get bucketWithVersioning API
ae4567ec2 is described below

commit ae4567ec2538c050f62a535acedf27b6f449a269
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu Apr 20 17:56:59 2023 +0200

    Add S3 put plus get bucketWithVersioning API
---
 .../stream/connectors/s3/impl/HttpRequests.scala   |  51 ++++-
 .../stream/connectors/s3/impl/Marshalling.scala    |  13 ++
 .../stream/connectors/s3/impl/S3Request.scala      |  10 +
 .../pekko/stream/connectors/s3/impl/S3Stream.scala |  41 ++++
 .../pekko/stream/connectors/s3/javadsl/S3.scala    | 120 ++++++++++++
 .../apache/pekko/stream/connectors/s3/model.scala  | 213 +++++++++++++++++++++
 .../pekko/stream/connectors/s3/scaladsl/S3.scala   |  99 ++++++++++
 .../connectors/s3/impl/HttpRequestsSpec.scala      |  15 ++
 .../connectors/s3/scaladsl/S3IntegrationSpec.scala |  45 +++++
 9 files changed, 606 insertions(+), 1 deletion(-)

diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
index 3418dab83..b9d92d399 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
@@ -24,13 +24,21 @@ import pekko.http.scaladsl.model.Uri.{ Authority, Query }
 import pekko.http.scaladsl.model.headers.{ `Raw-Request-URI`, Host, RawHeader }
 import pekko.http.scaladsl.model.{ RequestEntity, _ }
 import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle, 
VirtualHostAccessStyle }
-import pekko.stream.connectors.s3.{ ApiVersion, MultipartUpload, S3Settings }
+import pekko.stream.connectors.s3.{
+  ApiVersion,
+  BucketVersioning,
+  BucketVersioningStatus,
+  MFAStatus,
+  MultipartUpload,
+  S3Settings
+}
 import pekko.stream.scaladsl.Source
 import pekko.util.ByteString
 import software.amazon.awssdk.regions.Region
 
 import scala.collection.immutable.Seq
 import scala.concurrent.{ ExecutionContext, Future }
+import scala.xml.NodeSeq
 
 /**
  * Internal Api
@@ -178,6 +186,47 @@ import scala.concurrent.{ ExecutionContext, Future }
     s3Request(s3Location = s3Location, method = method)
       .withDefaultHeaders(headers)
 
+  def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], 
method: HttpMethod,
+      headers: Seq[HttpHeader] = Nil)(
+      implicit conf: S3Settings): HttpRequest = {
+
+    val confWithVirtualHost = conf.withAccessStyle(VirtualHostAccessStyle)
+    val authority = requestAuthority(bucket, 
conf.s3RegionProvider.getRegion)(confWithVirtualHost)
+
+    val finalHeaders = (mfaStatus, method) match {
+      case (Some(mfaEnabled: MFAStatus.Enabled), HttpMethods.PUT) =>
+        RawHeader("x-amz-mfa", s"${mfaEnabled.mfa.serialNumber} 
${mfaEnabled.mfa.tokenCode}") +: headers
+      case _ => headers
+    }
+
+    HttpRequest(method)
+      .withHeaders(Host(authority) +: finalHeaders)
+      .withUri(requestUri(bucket, 
None)(confWithVirtualHost).withAuthority(authority).withQuery(Query("versioning")))
+  }
+
+  def putBucketVersioningPayload(bucketVersioning: BucketVersioning)(
+      implicit ec: ExecutionContext): Future[RequestEntity] = {
+
+    val status = bucketVersioning.status.map {
+      case BucketVersioningStatus.Enabled   => <Status>Enabled</Status>
+      case BucketVersioningStatus.Suspended => <Status>Suspended</Status>
+    }.getOrElse(NodeSeq.Empty)
+
+    val mfaDelete = bucketVersioning.mfaDelete.map {
+      case _: MFAStatus.Enabled => <MfaDelete>Enabled</MfaDelete>
+      case MFAStatus.Disabled   => <MfaDelete>Disabled</MfaDelete>
+    }.getOrElse(NodeSeq.Empty)
+
+    // @formatter:off
+    val payload = <VersioningConfiguration 
xmlns="http://s3.amazonaws.com/doc/2006-03-01/";>
+      {status}
+      {mfaDelete}
+    </VersioningConfiguration>
+    // @formatter:on
+
+    Marshal(payload).to[RequestEntity]
+  }
+
   def uploadManagementRequest(
       s3Location: S3Location,
       uploadId: String,
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
index 1fd5a75d3..de168389e 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala
@@ -32,6 +32,19 @@ import scala.xml.NodeSeq
 @InternalApi private[impl] object Marshalling {
   import ScalaXmlSupport._
 
+  implicit val bucketVersioningUnmarshaller: 
FromEntityUnmarshaller[BucketVersioningResult] = {
+    nodeSeqUnmarshaller(MediaTypes.`application/xml`, 
ContentTypes.`application/octet-stream`).map {
+      case NodeSeq.Empty => throw Unmarshaller.NoContentException
+      case x =>
+        val status = (x \ "Status").headOption.map(_.text match {
+          case "Enabled"   => BucketVersioningStatus.Enabled
+          case "Suspended" => BucketVersioningStatus.Suspended
+        })
+        val MFADelete = (x \ "MfaDelete").headOption.map(_.exists(_.text == 
"Enabled"))
+        BucketVersioningResult(status, MFADelete)
+    }
+  }
+
   implicit val multipartUploadUnmarshaller: 
FromEntityUnmarshaller[MultipartUpload] = {
     nodeSeqUnmarshaller(MediaTypes.`application/xml`, 
ContentTypes.`application/octet-stream`).map {
       case NodeSeq.Empty => throw Unmarshaller.NoContentException
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
index ccd8407f9..bc9c5d6db 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
@@ -74,3 +74,13 @@ import org.apache.pekko.annotation.InternalApi
  * Internal Api
  */
 @InternalApi private[s3] case object CheckBucket extends S3Request
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object PutBucketVersioning extends S3Request
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object GetBucketVersioning extends S3Request
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 956d86bb0..e03fbbb80 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -779,6 +779,47 @@ import scala.util.{ Failure, Success, Try }
       attr: Attributes): Future[Done] =
     deleteUploadSource(bucket, key, uploadId, 
headers).withAttributes(attr).runWith(Sink.ignore)
 
+  private def bucketVersioningRequest(bucket: String, mfaStatus: 
Option[MFAStatus], headers: S3Headers)(
+      method: HttpMethod,
+      conf: S3Settings): HttpRequest =
+    HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, 
headers.headers)(conf)
+
+  def putBucketVersioningSource(
+      bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): 
Source[Done, NotUsed] =
+    s3ManagementRequest[Done](
+      bucket = bucket,
+      method = HttpMethods.PUT,
+      httpRequest = bucketVersioningRequest(bucket, 
bucketVersioning.mfaDelete, headers),
+      headers.headersFor(PutBucketVersioning),
+      process = processS3LifecycleResponse,
+      httpEntity = 
Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))
+
+  def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning, 
headers: S3Headers)(
+      implicit mat: Materializer,
+      attr: Attributes): Future[Done] =
+    putBucketVersioningSource(bucket, bucketVersioning, 
headers).withAttributes(attr).runWith(Sink.ignore)
+
+  def getBucketVersioningSource(
+      bucket: String, headers: S3Headers): Source[BucketVersioningResult, 
NotUsed] =
+    s3ManagementRequest[BucketVersioningResult](
+      bucket = bucket,
+      method = HttpMethods.GET,
+      httpRequest = bucketVersioningRequest(bucket, None, headers),
+      headers.headersFor(GetBucketVersioning),
+      process = { (response: HttpResponse, mat: Materializer) =>
+        response match {
+          case HttpResponse(status, _, entity, _) if status.isSuccess() =>
+            Unmarshal(entity).to[BucketVersioningResult](implicitly, 
ExecutionContexts.parasitic, mat)
+          case response: HttpResponse =>
+            unmarshalError(response.status, response.entity)(mat)
+        }
+      })
+
+  def getBucketVersioning(bucket: String, headers: S3Headers)(
+      implicit mat: Materializer,
+      attr: Attributes): Future[BucketVersioningResult] =
+    getBucketVersioningSource(bucket, 
headers).withAttributes(attr).runWith(Sink.head)
+
   private def s3ManagementRequest[T](
       bucket: String,
       method: HttpMethod,
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
index 7de398111..ee649545e 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
@@ -1633,4 +1633,124 @@ object S3 {
   private def func[T, R](f: T => R) = new pekko.japi.function.Function[T, R] {
     override def apply(param: T): R = f(param)
   }
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName       Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type 
[[Done]] as API doesn't return any additional information
+   */
+  def putBucketVersioning(bucketName: String, bucketVersioning: 
BucketVersioning)(
+      implicit system: ClassicActorSystemProvider,
+      attributes: Attributes = Attributes()): CompletionStage[Done] =
+    putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName       Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @param s3Headers  any headers you want to add
+   * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type 
[[Done]] as API doesn't return any additional information
+   */
+  def putBucketVersioning(
+      bucketName: String,
+      bucketVersioning: BucketVersioning,
+      s3Headers: S3Headers)(
+      implicit system: ClassicActorSystemProvider, attributes: Attributes): 
CompletionStage[Done] =
+    S3Stream
+      .putBucketVersioning(bucketName, bucketVersioning, 
s3Headers)(SystemMaterializer(system).materializer, attributes)
+      .toJava
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName       Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioningSource(bucketName: String, bucketVersioning: 
BucketVersioning): Source[Done, NotUsed] =
+    putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName       Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @param s3Headers  any headers you want to add
+   * @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioningSource(bucketName: String,
+      bucketVersioning: BucketVersioning,
+      s3Headers: S3Headers): Source[Done, NotUsed] =
+    S3Stream.putBucketVersioningSource(bucketName, bucketVersioning, 
s3Headers).asJava
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param system     the actor system which provides the materializer to run 
with
+   * @param attributes attributes to run request with
+   * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioning(bucketName: String,
+      system: ClassicActorSystemProvider,
+      attributes: Attributes): CompletionStage[BucketVersioningResult] =
+    getBucketVersioning(bucketName, system, attributes, S3Headers.empty)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param system     the actor system which provides the materializer to run 
with
+   * @param attributes attributes to run request with
+   * @param s3Headers  any headers you want to add
+   * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioning(bucketName: String,
+      system: ClassicActorSystemProvider,
+      attributes: Attributes,
+      s3Headers: S3Headers): CompletionStage[BucketVersioningResult] =
+    S3Stream.getBucketVersioning(bucketName, 
s3Headers)(SystemMaterializer(system).materializer, attributes).toJava
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param system     the actor system which provides the materializer to run 
with
+   * @return [[java.util.concurrent.CompletionStage CompletionStage]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioning(
+      bucketName: String, system: ClassicActorSystemProvider): 
CompletionStage[BucketVersioningResult] =
+    getBucketVersioning(bucketName, system, Attributes(), S3Headers.empty)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @return [[pekko.stream.javadsl.Source Source]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioningSource(bucketName: String): 
Source[BucketVersioningResult, NotUsed] =
+    getBucketVersioningSource(bucketName, S3Headers.empty)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param s3Headers  any headers you want to add
+   * @return [[pekko.stream.javadsl.Source Source]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers): 
Source[BucketVersioningResult, NotUsed] =
+    S3Stream.getBucketVersioningSource(bucketName, s3Headers).asJava
+
 }
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
index ecd177e22..d4c522846 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/model.scala
@@ -26,6 +26,219 @@ import scala.collection.immutable.Seq
 import scala.collection.immutable
 import scala.compat.java8.OptionConverters._
 
+final class MFA private (val serialNumber: String, val tokenCode: String) {
+
+  /** Java API */
+  def getSerialNumber: String = serialNumber
+
+  /** Java API */
+  def getTokenCode: String = tokenCode
+
+  def withSerialNumber(value: String): MFA = copy(serialNumber = value)
+
+  def withTokenCode(value: String): MFA = copy(tokenCode = value)
+
+  private def copy(serialNumber: String = serialNumber, tokenCode: String = 
tokenCode): MFA =
+    new MFA(
+      serialNumber,
+      tokenCode)
+
+  override def toString: String =
+    "MFA(" +
+    s"serialNumber=$serialNumber," +
+    s"tokenCode=$tokenCode" +
+    ")"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: MFA =>
+      Objects.equals(this.serialNumber, that.serialNumber) &&
+      Objects.equals(this.tokenCode, that.tokenCode)
+    case _ => false
+  }
+
+  override def hashCode(): Int =
+    Objects.hash(this.serialNumber, this.tokenCode)
+}
+
+object MFA {
+
+  /** Scala API */
+  def apply(serialNumber: String, tokenCode: String): MFA =
+    new MFA(serialNumber, tokenCode)
+
+  /** Java API */
+  def create(serialNumber: String, tokenCode: String): MFA =
+    apply(serialNumber, tokenCode)
+}
+
+sealed trait MFAStatus
+
+object MFAStatus {
+  final class Enabled private (val mfa: MFA) extends MFAStatus {
+
+    /** Java API */
+    def getMfa: MFA = mfa
+
+    // Warning is only being generated here because there is a single argument 
in the parameter list. If more fields
+    // get added to CommonPrefixes then the `@nowarn` is no longer needed
+    @nowarn
+    def withMfa(value: MFA): Enabled = copy(mfa = value)
+
+    private def copy(mfa: MFA): Enabled = new Enabled(mfa)
+
+    override def toString: String =
+      "Enabled(" +
+      s"mfa=$mfa" +
+      ")"
+
+    override def equals(other: Any): Boolean = other match {
+      case that: Enabled =>
+        Objects.equals(this.mfa, that.mfa)
+      case _ => false
+    }
+
+    override def hashCode(): Int = Objects.hash(this.mfa)
+
+  }
+
+  object Enabled {
+
+    /** Scala API */
+    def apply(mfa: MFA): Enabled = new Enabled(mfa)
+
+    /** Java API */
+    def create(mfa: MFA): Enabled = apply(mfa)
+  }
+
+  case object Disabled extends MFAStatus
+
+  /** Java API */
+  val disabled = Disabled
+
+}
+
+sealed trait BucketVersioningStatus
+
+object BucketVersioningStatus {
+  case object Enabled extends BucketVersioningStatus
+
+  case object Suspended extends BucketVersioningStatus
+
+  /** Java API */
+  val enabled = Enabled
+
+  /** Java API */
+  val suspended = Suspended
+}
+
+final class BucketVersioningResult private (val status: 
Option[BucketVersioningStatus],
+    val mfaDelete: Option[Boolean]) {
+
+  /** Java API */
+  def getStatus: Option[BucketVersioningStatus] = status
+
+  /** Java API */
+  def getMfaDelete: java.util.Optional[Boolean] = mfaDelete.asJava
+
+  def withStatus(value: BucketVersioningStatus): BucketVersioningResult =
+    copy(status = Some(value))
+
+  def withMfaDelete(value: Boolean): BucketVersioningResult =
+    copy(mfaDelete = Some(value))
+
+  private def copy(
+      status: Option[BucketVersioningStatus] = status, mfaDelete: 
Option[Boolean] = mfaDelete): BucketVersioningResult =
+    new BucketVersioningResult(
+      status,
+      mfaDelete)
+
+  override def toString: String =
+    "BucketVersioningResult(" +
+    s"status=$status," +
+    s"mfaDelete=$mfaDelete" +
+    ")"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: BucketVersioningResult =>
+      Objects.equals(this.status, that.status) &&
+      Objects.equals(this.mfaDelete, that.mfaDelete)
+    case _ => false
+  }
+
+  override def hashCode(): Int =
+    Objects.hash(this.status, this.mfaDelete)
+}
+
+object BucketVersioningResult {
+
+  def apply(): BucketVersioningResult = apply(None, None)
+
+  /** Scala API */
+  def apply(status: Option[BucketVersioningStatus], mfaDelete: 
Option[Boolean]): BucketVersioningResult =
+    new BucketVersioningResult(status, mfaDelete)
+
+  /** Java API */
+  def create(): BucketVersioningResult = apply()
+
+  def create(status: java.util.Optional[BucketVersioningStatus], mfaDelete: 
java.util.Optional[Boolean])
+      : BucketVersioningResult =
+    apply(status.asScala, mfaDelete.asScala)
+
+}
+
+final class BucketVersioning private (val status: 
Option[BucketVersioningStatus], val mfaDelete: Option[MFAStatus]) {
+
+  /** Java API */
+  def getStatus: java.util.Optional[BucketVersioningStatus] = status.asJava
+
+  /** Java API */
+  def getMfaDelete: java.util.Optional[MFAStatus] = mfaDelete.asJava
+
+  def withStatus(value: BucketVersioningStatus): BucketVersioning = 
copy(status = Some(value))
+
+  def withMfaDelete(value: MFAStatus): BucketVersioning = copy(mfaDelete = 
Some(value))
+
+  private def copy(
+      status: Option[BucketVersioningStatus] = status, mfaDelete: 
Option[MFAStatus] = mfaDelete): BucketVersioning =
+    new BucketVersioning(
+      status,
+      mfaDelete)
+
+  override def toString: String =
+    "BucketVersioning(" +
+    s"status=$status," +
+    s"mfaDelete=$mfaDelete" +
+    ")"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: BucketVersioning =>
+      Objects.equals(this.status, that.status) &&
+      Objects.equals(this.mfaDelete, that.mfaDelete)
+    case _ => false
+  }
+
+  override def hashCode(): Int =
+    Objects.hash(this.status, this.mfaDelete)
+}
+
+object BucketVersioning {
+
+  /** Scala API */
+  def apply(): BucketVersioning = apply(None, None)
+
+  /** Scala API */
+  def apply(status: Option[BucketVersioningStatus], mfaDelete: 
Option[MFAStatus]): BucketVersioning =
+    new BucketVersioning(status, mfaDelete)
+
+  /** Java API */
+  def create(): BucketVersioning = apply()
+
+  def create(
+      status: java.util.Optional[BucketVersioningStatus], mfaDelete: 
java.util.Optional[MFAStatus]): BucketVersioning =
+    apply(status.asScala, mfaDelete.asScala)
+
+}
+
 final class MultipartUpload private (val bucket: String, val key: String, val 
uploadId: String) {
 
   /** Java API */
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
index 03a11fcb4..fe53fce1d 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
@@ -1092,4 +1092,103 @@ object S3 {
       uploadId: String,
       s3Headers: S3Headers): Source[Done, NotUsed] =
     S3Stream.deleteUploadSource(bucketName, key, uploadId, s3Headers)
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @return [[scala.concurrent.Future Future]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioning(bucketName: String, bucketVersioning: 
BucketVersioning)(
+      implicit system: ClassicActorSystemProvider,
+      attributes: Attributes = Attributes()): Future[Done] =
+    putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @param s3Headers  any headers you want to add
+   * @return [[scala.concurrent.Future Future]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioning(
+      bucketName: String,
+      bucketVersioning: BucketVersioning,
+      s3Headers: S3Headers)(implicit system: ClassicActorSystemProvider, 
attributes: Attributes): Future[Done] =
+    S3Stream.putBucketVersioning(bucketName, bucketVersioning, s3Headers)
+
+  /**
+   * Delete all existing parts for a specific upload
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @return [[pekko.stream.scaladsl.Source Source]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioningSource(bucketName: String, bucketVersioning: 
BucketVersioning): Source[Done, NotUsed] =
+    putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)
+
+  /**
+   * Sets the versioning state of an existing bucket.
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param bucketVersioning The state that you want to update
+   * @param s3Headers  any headers you want to add
+   * @return [[pekko.stream.scaladsl.Source Source]] of type [[Done]] as API 
doesn't return any additional information
+   */
+  def putBucketVersioningSource(bucketName: String,
+      bucketVersioning: BucketVersioning,
+      s3Headers: S3Headers): Source[Done, NotUsed] =
+    S3Stream.putBucketVersioningSource(bucketName, bucketVersioning, s3Headers)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @return [[scala.concurrent.Future Future]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioning(bucketName: String)(implicit system: 
ClassicActorSystemProvider,
+      attributes: Attributes = Attributes()): Future[BucketVersioningResult] =
+    S3Stream.getBucketVersioning(bucketName, S3Headers.empty)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param s3Headers  any headers you want to add
+   * @return [[scala.concurrent.Future Future]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioning(
+      bucketName: String,
+      s3Headers: S3Headers)(
+      implicit system: ClassicActorSystemProvider, attributes: Attributes): 
Future[BucketVersioningResult] =
+    S3Stream.getBucketVersioning(bucketName, s3Headers)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @return [[pekko.stream.scaladsl.Source Source]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioningSource(bucketName: String): 
Source[BucketVersioningResult, NotUsed] =
+    getBucketVersioningSource(bucketName, S3Headers.empty)
+
+  /**
+   * Gets the versioning of an existing bucket
+   *
+   * @see 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
+   * @param bucketName Bucket name
+   * @param s3Headers  any headers you want to add
+   * @return [[pekko.stream.scaladsl.Source Source]] of type 
[[BucketVersioningResult]]
+   */
+  def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers): 
Source[BucketVersioningResult, NotUsed] =
+    S3Stream.getBucketVersioningSource(bucketName, s3Headers)
 }
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
index 558eb2e52..7c7564b1e 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
@@ -500,4 +500,19 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers 
with ScalaFutures with
     request.uri.queryString() should equal(None)
     request.method should equal(HttpMethods.HEAD)
   }
+
+  it should "add x-amz-mfa headers for a putBucketVersioning request" in {
+    implicit val settings: S3Settings = getSettings()
+
+    val serialNumber = "serial-number"
+    val tokenCode = "token-code"
+
+    val request: HttpRequest = 
HttpRequests.bucketVersioningRequest("target-bucket",
+      Some(MFAStatus.Enabled(MFA(serialNumber, tokenCode))),
+      HttpMethods.PUT)
+
+    request.headers.collectFirst {
+      case httpHeader if httpHeader.is("x-amz-mfa") => httpHeader.value()
+    } should equal(Some(s"$serialNumber $tokenCode"))
+  }
 }
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 21259e275..dffa40cc7 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -828,6 +828,51 @@ trait S3IntegrationSpec
     S3.makeBucket(defaultBucket).futureValue shouldBe Done
   }
 
+  it should "enable and disable versioning for a bucket" in {
+    // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
+    assume(this.isInstanceOf[AWSS3IntegrationSpec])
+    implicit val attr: Attributes = attributes
+    val bucketName = "samplebucketversioning"
+
+    val request = for {
+      _ <- S3.makeBucket(bucketName)
+      firstResult <- S3.getBucketVersioning(bucketName)
+      _ <- S3.putBucketVersioning(bucketName, 
BucketVersioning().withStatus(BucketVersioningStatus.Enabled))
+      secondResult <- S3.getBucketVersioning(bucketName)
+    } yield (firstResult, secondResult)
+
+    whenReady(request) { case (firstValue, secondValue) =>
+      firstValue shouldEqual BucketVersioningResult()
+      secondValue shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled)
+      S3.putBucketVersioning(bucketName,
+        
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
+      S3.deleteBucket(bucketName).futureValue
+    }
+  }
+
+  it should "enable and disable versioning for a bucket with MFA delete 
configured to false" in {
+    // TODO: Figure out a way to properly test this with Minio, see 
https://github.com/akka/alpakka/issues/2750
+    assume(this.isInstanceOf[AWSS3IntegrationSpec])
+    implicit val attr: Attributes = attributes
+    val bucketName = "samplebucketversioningmfadeletefalse"
+
+    val request = for {
+      _ <- S3.makeBucket(bucketName)
+      _ <- S3.putBucketVersioning(bucketName,
+        BucketVersioning()
+          .withStatus(BucketVersioningStatus.Enabled)
+          .withMfaDelete(MFAStatus.Disabled))
+      result <- S3.getBucketVersioning(bucketName)
+    } yield result
+
+    whenReady(request) { value =>
+      value shouldEqual 
BucketVersioningResult().withStatus(BucketVersioningStatus.Enabled).withMfaDelete(false)
+      S3.putBucketVersioning(bucketName,
+        
BucketVersioning().withStatus(BucketVersioningStatus.Suspended)).futureValue 
shouldBe Done
+      S3.deleteBucket(bucketName).futureValue
+    }
+  }
+
   it should "create and delete bucket with a given name" in {
     val bucketName = "samplebucket3"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to