This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new ed7ac3647 Move AWS SPI code into Pekko repo (#648)
ed7ac3647 is described below

commit ed7ac36475745d32204df403252ee6d576daf1bd
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Jun 12 17:56:03 2024 +0100

    Move AWS SPI code into Pekko repo (#648)
    
    * move aws-spi to pekko-connectors
    
    Co-authored-by: Matthias Lüneberg <[email protected]>
    
    * rename packages
    
    import order
    
    scalafmt
    
    Update KinesisFirehoseSnippets.java
    
    imports
    
    * compile warnings
    
    * try to suppress warning
    
    * exclude aws-spi-pekko-http from mima check
    
    * scalafmt
    
    * Update build.sbt
    
    * add build job
    
    * compile issues
    
    * Update application.conf
    
    * fix module name of awslambda
    
    * rename files
    
    ---------
    
    Co-authored-by: Matthias Lüneberg <[email protected]>
---
 .github/workflows/check-build-test.yml             |   4 +
 NOTICE                                             |   7 +
 .../docs/javadsl/EventBridgePublisherTest.java     |   2 +-
 .../aws/eventbridge/IntegrationTestContext.scala   |   2 +-
 .../src/it/resources/application.conf              |   3 +
 .../pekko/stream/connectors/awsspi/TestBase.scala  |  47 +++++
 .../awsspi/dynamodb/DynamoDBITTest.scala           |  91 ++++++++++
 .../connectors/awsspi/kinesis/KinesisITTest.scala  | 111 ++++++++++++
 .../stream/connectors/awsspi/s3/S3ITTest.scala     | 137 ++++++++++++++
 .../stream/connectors/awsspi/sqs/SQSITTest.scala   |  75 ++++++++
 ...re.amazon.awssdk.http.async.SdkAsyncHttpService |   1 +
 .../awsspi/PekkoHttpAsyncHttpService.scala         |  25 +++
 .../stream/connectors/awsspi/PekkoHttpClient.scala | 198 +++++++++++++++++++++
 .../stream/connectors/awsspi/RequestRunner.scala   |  87 +++++++++
 .../pekko/stream/connectors/awsspi/S3Test.java     | 144 +++++++++++++++
 .../src/test/resources/application.conf            |   3 +
 .../src/test/resources/logback-test.xml            |  43 +++++
 .../connectors/awsspi/BaseAwsClientTest.scala      |  74 ++++++++
 .../connectors/awsspi/PekkoHttpClientSpec.scala    |  51 ++++++
 .../connectors/awsspi/RequestRunnerSpec.scala      |  63 +++++++
 .../connectors/awsspi/dynamodb/TestDynamoDB.scala  |  78 ++++++++
 .../pekko/stream/connectors/awsspi/s3/TestS3.scala | 188 +++++++++++++++++++
 .../stream/connectors/awsspi/sns/TestSNS.scala     |  58 ++++++
 .../stream/connectors/awsspi/sqs/TestSQS.scala     |  94 ++++++++++
 .../LocalStackReadyLogWaitStrategy.scala           |  48 +++++
 .../testcontainers/TimeoutWaitStrategy.scala       |  30 ++++
 awslambda/src/test/java/docs/javadsl/Examples.java |   2 +-
 .../src/test/scala/docs/scaladsl/Examples.scala    |   2 +-
 build.sbt                                          |  19 +-
 .../src/test/java/docs/javadsl/ExampleTest.java    |   2 +-
 dynamodb/src/test/java/docs/javadsl/RetryTest.java |   2 +-
 .../src/test/scala/docs/scaladsl/ExampleSpec.scala |   2 +-
 .../src/test/scala/docs/scaladsl/RetrySpec.scala   |   2 +-
 .../stream/connectors/dynamodb/ItemSpec.scala      |   2 +-
 .../stream/connectors/dynamodb/TableSpec.scala     |   2 +-
 .../java/docs/javadsl/KinesisFirehoseSnippets.java |   2 +-
 .../test/java/docs/javadsl/KinesisSnippets.java    |   2 +-
 .../docs/scaladsl/KinesisFirehoseSnippets.scala    |   2 +-
 .../test/scala/docs/scaladsl/KinesisSnippets.scala |   2 +-
 project/Dependencies.scala                         |  33 ++--
 .../test/java/docs/javadsl/SnsPublisherTest.java   |   2 +-
 .../connectors/sns/IntegrationTestContext.scala    |   2 +-
 sqs/src/test/java/docs/javadsl/SqsSourceTest.java  |   2 +-
 .../stream/connectors/sqs/javadsl/BaseSqsTest.java |   2 +-
 .../test/scala/docs/scaladsl/SqsSourceSpec.scala   |   2 +-
 .../sqs/scaladsl/DefaultTestContext.scala          |   2 +-
 46 files changed, 1716 insertions(+), 36 deletions(-)

diff --git a/.github/workflows/check-build-test.yml 
b/.github/workflows/check-build-test.yml
index 96d9b7bb5..5b153a643 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -40,6 +40,9 @@ jobs:
       - name: "Code style, compile tests, MiMa. Run locally with: sbt 
\"javafmtCheckAll; +Test/compile; +mimaReportBinaryIssues\""
         run: sbt "javafmtCheckAll; +Test/compile; +mimaReportBinaryIssues"
 
+      - name: "Integration Test Compile"
+        run: sbt "+IntegrationTest/compile"
+
   documentation:
     name: ScalaDoc, Documentation with Paradox
     runs-on: ubuntu-20.04
@@ -78,6 +81,7 @@ jobs:
         include:
           - { connector: amqp,                         pre_cmd: 
'docker-compose up -d amqp' }
           - { connector: avroparquet }
+          - { connector: aws-spi-pekko-http }
           - { connector: awslambda }
           - { connector: aws-event-bridge,             pre_cmd: 
'docker-compose up -d amazoneventbridge' }
           - { connector: azure-storage-queue }
diff --git a/NOTICE b/NOTICE
index fe0645e02..0794f4e7c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -12,6 +12,13 @@ Apache License, Version 2.0 License.
 
 ---------------
 
+pekko-connectors-aws-spi-pekko-http is based on aws-spi-akka-http
+<https://github.com/matsluni/aws-spi-akka-http>.
+This code was released under an Apache 2.0 license and a Software Grant 
Agreement
+has been registered with the ASF secretary.
+
+---------------
+
 pekko-connectors-ftp contains code from Apache Commons Net
 <https://commons.apache.org/proper/commons-net/>.
 This code was released under an Apache 2.0 license.
diff --git 
a/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java 
b/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
index cc13e08d7..47e303b02 100644
--- a/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
+++ b/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
@@ -18,13 +18,13 @@ import org.apache.pekko.Done;
 import org.apache.pekko.actor.ActorSystem;
 // #init-system
 import 
org.apache.pekko.stream.connectors.aws.eventbridge.javadsl.EventBridgePublisher;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
 import org.apache.pekko.testkit.javadsl.TestKit;
 
 // #init-client
 import java.net.URI;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
diff --git 
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
 
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
index 43cd1ff88..338554ea2 100644
--- 
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
+++ 
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
@@ -55,7 +55,7 @@ trait IntegrationTestContext extends BeforeAndAfterAll with 
ScalaFutures {
     // #init-client
     import java.net.URI
 
-    import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+    import pekko.stream.connectors.awsspi.PekkoHttpClient
     import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
     import software.amazon.awssdk.regions.Region
     import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
diff --git a/aws-spi-pekko-http/src/it/resources/application.conf 
b/aws-spi-pekko-http/src/it/resources/application.conf
new file mode 100644
index 000000000..8a2cb9651
--- /dev/null
+++ b/aws-spi-pekko-http/src/it/resources/application.conf
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.http.client.parsing.max-content-length = 15m
diff --git 
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
new file mode 100644
index 000000000..d0494ad9a
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import software.amazon.awssdk.auth.credentials.{
+  AwsCredentialsProviderChain,
+  EnvironmentVariableCredentialsProvider,
+  ProfileCredentialsProvider,
+  SystemPropertyCredentialsProvider
+}
+import software.amazon.awssdk.regions.Region
+
+import scala.util.Random
+
+trait TestBase {
+
+  private lazy val credentialProfileName = "spi-test-account"
+
+  lazy val defaultRegion = Region.EU_WEST_1
+
+  def randomIdentifier(length: Int): String = 
Random.alphanumeric.take(length).mkString
+
+  lazy val credentialProviderChain =
+    AwsCredentialsProviderChain.builder
+      .credentialsProviders(
+        EnvironmentVariableCredentialsProvider.create,
+        SystemPropertyCredentialsProvider.create,
+        ProfileCredentialsProvider.builder
+          .profileName(credentialProfileName)
+          .build)
+      .build
+}
diff --git 
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
new file mode 100644
index 000000000..1ab2c0b06
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.dynamodb
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import pekko.util.FutureConverters
+import org.scalatest.concurrent.{ Eventually, Futures, IntegrationPatience }
+import org.scalatest.concurrent.ScalaFutures._
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
+import software.amazon.awssdk.services.dynamodb.model._
+
+class DynamoDBITTest
+    extends AnyWordSpec
+    with Matchers
+    with Futures
+    with Eventually
+    with IntegrationPatience
+    with TestBase {
+
+  def withClient(testCode: DynamoDbAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = DynamoDbAsyncClient
+      .builder()
+      .credentialsProvider(credentialProviderChain)
+      .region(defaultRegion)
+      .httpClient(pekkoClient)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  "DynamoDB" should {
+    "create a table" in withClient { implicit client =>
+      val tableName = s"Movies-${randomIdentifier(5)}"
+      val attributes = 
AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build()
+      val keySchema = 
KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build()
+
+      val result = client
+        .createTable(
+          CreateTableRequest
+            .builder()
+            .tableName(tableName)
+            .attributeDefinitions(attributes)
+            .keySchema(keySchema)
+            .provisionedThroughput(
+              ProvisionedThroughput.builder
+                .readCapacityUnits(1L)
+                .writeCapacityUnits(1L)
+                .build())
+            .build())
+        .join
+
+      val desc = result.tableDescription()
+      desc.tableName() should be(tableName)
+
+      eventually {
+        val response =
+          
FutureConverters.asScala(client.describeTable(DescribeTableRequest.builder().tableName(tableName).build()))
+        response.futureValue.table().tableStatus() should 
be(TableStatus.ACTIVE)
+      }
+      
FutureConverters.asScala(client.deleteTable(DeleteTableRequest.builder().tableName(tableName).build()))
+
+    }
+  }
+
+}
diff --git 
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
new file mode 100644
index 000000000..348987b62
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.kinesis
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.core.SdkBytes
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
+import software.amazon.awssdk.services.kinesis.model._
+
+import scala.util.Random
+
+class KinesisITTest extends AnyWordSpec with Matchers with TestBase {
+
+  def withClient(testCode: KinesisAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = KinesisAsyncClient
+      .builder()
+      .credentialsProvider(credentialProviderChain)
+      .region(defaultRegion)
+      .httpClient(pekkoClient)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  "Kinesis async client" should {
+
+    "use a data stream: create + put + get + delete" in withClient { implicit 
client =>
+      val streamName = "aws-spi-test-" + 
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+      val data = "123"
+
+      val createRequest = CreateStreamRequest
+        .builder()
+        .streamName(streamName)
+        .shardCount(1)
+        .build()
+
+      val _ = client.createStream(createRequest).join()
+      val describeStreamRequest = 
DescribeStreamRequest.builder().streamName(streamName).build()
+
+      Thread.sleep(5000)
+      val streamArn = waitToBeCreated(client, describeStreamRequest, 15)
+
+      val putRequest = PutRecordRequest
+        .builder()
+        .streamName(streamName)
+        .partitionKey("partitionKey")
+        .data(SdkBytes.fromUtf8String(data))
+        .build()
+
+      val putResponse = client.putRecord(putRequest).join()
+
+      val getShardIterator = GetShardIteratorRequest
+        .builder()
+        .streamName(streamName)
+        .shardId(putResponse.shardId())
+        .shardIteratorType("TRIM_HORIZON")
+        .build()
+      val shardIterator = client.getShardIterator(getShardIterator).join()
+
+      val getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator.shardIterator()).limit(1).build()
+
+      Thread.sleep(3000)
+
+      val res = client.getRecords(getRecordsRequest).join()
+
+      res.records().get(0).data().asUtf8String() shouldBe data
+
+      
client.deleteStream(DeleteStreamRequest.builder().streamName(streamName).build()).join()
+
+    }
+  }
+
+  private def waitToBeCreated(client: KinesisAsyncClient, req: 
DescribeStreamRequest, tries: Int): String =
+    if (tries == 0) "error"
+    else {
+      val r = client.describeStream(req).join()
+      if (r.streamDescription().streamStatus() == StreamStatus.ACTIVE) {
+        println(s"Current status: ${r.streamDescription().streamStatus()}")
+        r.streamDescription().streamARN()
+      } else {
+        Thread.sleep(1000)
+        waitToBeCreated(client, req, tries - 1)
+      }
+    }
+}
diff --git 
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
new file mode 100644
index 000000000..b13f3a0d2
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3
+
+import java.io.{ File, FileWriter }
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.core.async.{ AsyncRequestBody, 
AsyncResponseTransformer }
+import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3Configuration }
+import software.amazon.awssdk.services.s3.model._
+
+import scala.util.Random
+
+class S3ITTest extends AnyWordSpec with Matchers with TestBase {
+
+  def withClient(checksumEnabled: Boolean = false)(testCode: S3AsyncClient => 
Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = S3AsyncClient
+      .builder()
+      
.serviceConfiguration(S3Configuration.builder().checksumValidationEnabled(checksumEnabled).build())
+      .credentialsProvider(credentialProviderChain)
+      .region(defaultRegion)
+      .httpClient(pekkoClient)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  "S3 async client" should {
+    "upload and download a file to a bucket + cleanup" in 
withClient(checksumEnabled = true) { implicit client =>
+      val bucketName = "aws-spi-test-" + 
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+      createBucket(bucketName)
+      val randomFile = File.createTempFile("aws", 
Random.alphanumeric.take(5).mkString)
+      val fileContent = Random.alphanumeric.take(1000).mkString
+      val fileWriter = new FileWriter(randomFile)
+      fileWriter.write(fileContent)
+      fileWriter.flush()
+      
client.putObject(PutObjectRequest.builder().bucket(bucketName).key("my-file").build(),
 randomFile.toPath).join
+
+      val result = client
+        
.getObject(GetObjectRequest.builder().bucket(bucketName).key("my-file").build(),
+          AsyncResponseTransformer.toBytes[GetObjectResponse]())
+        .join
+      result.asUtf8String() should be(fileContent)
+
+      
client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key("my-file").build()).join()
+
+      
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join()
+    }
+
+    "multipart upload" in withClient() { implicit client =>
+      val bucketName = "aws-spi-test-" + 
Random.alphanumeric.take(5).map(_.toLower).mkString
+      createBucket(bucketName)
+      val fileContent = (0 to 1000000).mkString
+      val createMultipartUploadResponse = client
+        .createMultipartUpload(
+          
CreateMultipartUploadRequest.builder().bucket(bucketName).key("bar").contentType("text/plain").build())
+        .join()
+
+      val p1 = client
+        .uploadPart(
+          UploadPartRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .partNumber(1)
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .build(),
+          AsyncRequestBody.fromString(fileContent))
+        .join
+      val p2 = client
+        .uploadPart(
+          UploadPartRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .partNumber(2)
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .build(),
+          AsyncRequestBody.fromString(fileContent))
+        .join
+
+      client
+        .completeMultipartUpload(
+          CompleteMultipartUploadRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .multipartUpload(
+              CompletedMultipartUpload
+                .builder()
+                
.parts(CompletedPart.builder().partNumber(1).eTag(p1.eTag()).build(),
+                  
CompletedPart.builder().partNumber(2).eTag(p2.eTag()).build())
+                .build())
+            .build())
+        .join
+
+      val result = client
+        
.getObject(GetObjectRequest.builder().bucket(bucketName).key("bar").build(),
+          AsyncResponseTransformer.toBytes[GetObjectResponse]())
+        .join
+      result.asUtf8String() should be(fileContent + fileContent)
+
+      
client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key("bar").build()).join()
+      
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join()
+    }
+  }
+
+  def createBucket(name: String)(implicit client: S3AsyncClient): Unit =
+    
client.createBucket(CreateBucketRequest.builder().bucket(name).build()).join
+
+}
diff --git 
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
new file mode 100644
index 000000000..4aa7f562e
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sqs
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.model.{
+  CreateQueueRequest,
+  DeleteQueueRequest,
+  ReceiveMessageRequest,
+  SendMessageRequest
+}
+
+import scala.util.Random
+
+class SQSITTest extends AnyWordSpec with Matchers with TestBase {
+
+  def withClient(testCode: SqsAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = SqsAsyncClient
+      .builder()
+      .credentialsProvider(credentialProviderChain)
+      .httpClient(pekkoClient)
+      .region(defaultRegion)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  "Async SQS client" should {
+
+    "publish a message to a queue" in withClient { implicit client =>
+      val queueName = "aws-spi-test-" + 
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+      val queueResponse = 
client.createQueue(CreateQueueRequest.builder().queueName(queueName).build()).join()
+      val queueUrl = queueResponse.queueUrl()
+      
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody("123").build()).join()
+      val receivedMessage =
+        
client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(1).build()).join()
+      receivedMessage.messages().get(0).body() should be("123")
+
+      // deleteQueue
+      
client.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build()).join()
+
+      val queueListing = client.listQueues().join()
+      queueListing.queueUrls() shouldBe java.util.Collections.EMPTY_LIST
+
+    }
+  }
+
+}
diff --git 
a/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
 
b/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
new file mode 100644
index 000000000..6118ec68b
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
@@ -0,0 +1 @@
+org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService
diff --git 
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
new file mode 100644
index 000000000..9d0d66eef
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import PekkoHttpClient.PekkoHttpClientBuilder
+import software.amazon.awssdk.http.async.SdkAsyncHttpService
+
+class PekkoHttpAsyncHttpService extends SdkAsyncHttpService {
+  override def createAsyncHttpClientFactory(): PekkoHttpClientBuilder = 
PekkoHttpClient.builder()
+}
diff --git 
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
new file mode 100644
index 000000000..9aacfd2b5
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.concurrent.{ CompletableFuture, TimeUnit }
+
+import org.apache.pekko
+import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpHeader.ParsingResult
+import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok
+import pekko.http.scaladsl.model.MediaType.Compressible
+import pekko.http.scaladsl.model.RequestEntityAcceptance.Expected
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
+import pekko.http.scaladsl.settings.ConnectionPoolSettings
+import pekko.stream.scaladsl.Source
+import pekko.stream.{ Materializer, SystemMaterializer }
+import pekko.util.ByteString
+import pekko.util.OptionConverters
+import org.slf4j.LoggerFactory
+import software.amazon.awssdk.http.async._
+import software.amazon.awssdk.http.SdkHttpRequest
+import software.amazon.awssdk.utils.AttributeMap
+
+import scala.collection.immutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ Await, ExecutionContext }
+
+class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: 
ConnectionPoolSettings)(implicit
+    actorSystem: ActorSystem,
+    ec: ExecutionContext,
+    mat: Materializer) extends SdkAsyncHttpClient {
+  import PekkoHttpClient._
+
+  lazy val runner = new RequestRunner()
+
+  override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] 
= {
+    val pekkoHttpRequest = toPekkoRequest(request.request(), 
request.requestContentPublisher())
+    runner.run(
+      () => Http().singleRequest(pekkoHttpRequest, settings = 
connectionSettings),
+      request.responseHandler())
+  }
+
+  override def close(): Unit =
+    shutdownHandle()
+
+  override def clientName(): String = "pekko-http"
+}
+
+object PekkoHttpClient {
+
+  val logger = LoggerFactory.getLogger(this.getClass)
+
+  private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
+      contentPublisher: SdkHttpContentPublisher): HttpRequest = {
+    val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
+    val method = convertMethod(request.method().name())
+    HttpRequest(
+      method = method,
+      uri = Uri(request.getUri.toString),
+      headers = reqheaders,
+      entity =
+        entityForMethodAndContentType(method, 
contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
+      protocol = HttpProtocols.`HTTP/1.1`)
+  }
+
+  private[awsspi] def entityForMethodAndContentType(method: HttpMethod,
+      contentType: ContentType,
+      contentPublisher: SdkHttpContentPublisher): RequestEntity =
+    method.requestEntityAcceptance match {
+      case Expected =>
+        OptionConverters.toScala(contentPublisher.contentLength()) match {
+          case Some(length) =>
+            HttpEntity(contentType, length, 
Source.fromPublisher(contentPublisher).map(ByteString(_)))
+          case None => HttpEntity(contentType, 
Source.fromPublisher(contentPublisher).map(ByteString(_)))
+        }
+      case _ => HttpEntity.Empty
+    }
+
+  private[awsspi] def convertMethod(method: String): HttpMethod =
+    HttpMethods
+      .getForKeyCaseInsensitive(method)
+      .getOrElse(throw new IllegalArgumentException(s"Method not configured: 
$method"))
+
+  private[awsspi] def contentTypeHeaderToContentType(contentTypeHeader: 
Option[HttpHeader]): ContentType =
+    contentTypeHeader
+      .map(_.value())
+      .map(v => contentTypeMap.getOrElse(v, tryCreateCustomContentType(v)))
+      // Its allowed to not have a content-type: 
https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1
+      //
+      //  Any HTTP/1.1 message containing an entity-body SHOULD include a 
Content-Type header field defining the media type
+      //  of that body. If and only if the media type is not given by a 
Content-Type field, the recipient MAY attempt to
+      //  guess the media type via inspection of its content and/or the name 
extension(s) of the URI used to identify the
+      //  resource. If the media type remains unknown, the recipient SHOULD 
treat it as type "application/octet-stream".
+      //
+      .getOrElse(ContentTypes.NoContentType)
+
+  // This method converts the headers to Akka-http headers and drops 
content-length and returns content-type separately
+  private[awsspi] def convertHeaders(
+      headers: java.util.Map[String, java.util.List[String]]): 
(Option[HttpHeader], immutable.Seq[HttpHeader]) = {
+    val headersAsScala = {
+      val builder = collection.mutable.Map.newBuilder[String, 
java.util.List[String]]
+      headers.forEach { case (k, v) => builder += k -> v }
+      builder.result()
+    }
+
+    headersAsScala.foldLeft((Option.empty[HttpHeader], 
List.empty[HttpHeader])) { case ((ctHeader, hdrs), header) =>
+      val (headerName, headerValue) = header
+      if (headerValue.size() != 1) {
+        throw new IllegalArgumentException(
+          s"Found invalid header: key: $headerName, Value: ${val list = 
List.newBuilder[String]
+            headerValue.forEach(v => list += v)
+            list.result()}.")
+      }
+      // skip content-length as it will be calculated by pekko-http itself and 
must not be provided in the request headers
+      if (`Content-Length`.lowercaseName == headerName.toLowerCase) (ctHeader, 
hdrs)
+      else {
+        HttpHeader.parse(headerName, headerValue.get(0)) match {
+          case ok: Ok =>
+            // return content-type separately as it will be used to calculate 
ContentType, which is used on HttpEntity
+            if (ok.header.lowercaseName() == `Content-Type`.lowercaseName) 
(Some(ok.header), hdrs)
+            else (ctHeader, hdrs :+ ok.header)
+          case error: ParsingResult.Error =>
+            throw new IllegalArgumentException(s"Found invalid header: 
${error.errors}.")
+        }
+      }
+    }
+  }
+
+  private[awsspi] def tryCreateCustomContentType(contentTypeStr: String): 
ContentType = {
+    logger.debug(s"Try to parse content type from $contentTypeStr")
+    val mainAndsubType = contentTypeStr.split('/')
+    if (mainAndsubType.length == 2)
+      ContentType(MediaType.customBinary(mainAndsubType(0), mainAndsubType(1), 
Compressible))
+    else throw new RuntimeException(s"Could not parse custom content type 
'$contentTypeStr'.")
+  }
+
+  def builder() = PekkoHttpClientBuilder()
+
+  case class PekkoHttpClientBuilder(private val actorSystem: 
Option[ActorSystem] = None,
+      private val executionContext: Option[ExecutionContext] = None,
+      private val connectionPoolSettings: Option[ConnectionPoolSettings] = 
None)
+      extends SdkAsyncHttpClient.Builder[PekkoHttpClientBuilder] {
+    def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
+      implicit val as = actorSystem.getOrElse(ActorSystem("aws-pekko-http"))
+      implicit val ec = executionContext.getOrElse(as.dispatcher)
+      val mat: Materializer = SystemMaterializer(as).materializer
+
+      val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
+      val shutdownhandleF = () => {
+        if (actorSystem.isEmpty) {
+          Await.result(Http().shutdownAllConnectionPools().flatMap(_ => 
as.terminate()),
+            Duration.apply(10, TimeUnit.SECONDS))
+        }
+        ()
+      }
+      new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat)
+    }
+    def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = 
copy(actorSystem = Some(actorSystem))
+    def withActorSystem(actorSystem: ClassicActorSystemProvider): 
PekkoHttpClientBuilder =
+      copy(actorSystem = Some(actorSystem.classicSystem))
+    def withExecutionContext(executionContext: ExecutionContext): 
PekkoHttpClientBuilder =
+      copy(executionContext = Some(executionContext))
+    def withConnectionPoolSettings(connectionPoolSettings: 
ConnectionPoolSettings): PekkoHttpClientBuilder =
+      copy(connectionPoolSettings = Some(connectionPoolSettings))
+  }
+
+  lazy val xAmzJson = ContentType(MediaType.customBinary("application", 
"x-amz-json-1.0", Compressible))
+  lazy val xAmzJson11 = ContentType(MediaType.customBinary("application", 
"x-amz-json-1.1", Compressible))
+  lazy val xAmzCbor11 = ContentType(MediaType.customBinary("application", 
"x-amz-cbor-1.1", Compressible))
+  lazy val formUrlEncoded =
+    ContentType(MediaType.applicationWithOpenCharset("x-www-form-urlencoded"), 
HttpCharset.custom("utf-8"))
+  lazy val applicationXml = ContentType(MediaType.customBinary("application", 
"xml", Compressible))
+
+  lazy val contentTypeMap: collection.immutable.Map[String, ContentType] = 
collection.immutable.Map(
+    "application/x-amz-json-1.0" -> xAmzJson,
+    "application/x-amz-json-1.1" -> xAmzJson11,
+    "application/x-amz-cbor-1.1" -> xAmzCbor11, // used by Kinesis
+    "application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded,
+    "application/x-www-form-urlencoded" -> formUrlEncoded,
+    "application/xml" -> applicationXml)
+}
diff --git 
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
new file mode 100644
index 000000000..9eaf3eac2
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.concurrent.CompletableFuture
+import java.util.Collections
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.{ ContentTypes, HttpResponse }
+import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
+import pekko.stream.Materializer
+import pekko.stream.scaladsl.{ Keep, Sink }
+import pekko.util.FutureConverters
+import org.slf4j.LoggerFactory
+import software.amazon.awssdk.http.SdkHttpFullResponse
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler
+
+import scala.concurrent.{ ExecutionContext, Future }
+
+class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat: 
Materializer) {
+
+  val logger = LoggerFactory.getLogger(this.getClass)
+
+  def run(runRequest: () => Future[HttpResponse], handler: 
SdkAsyncHttpResponseHandler): CompletableFuture[Void] = {
+    val result = runRequest().flatMap { response =>
+      handler.onHeaders(toSdkHttpFullResponse(response))
+
+      val (complete, publisher) = response.entity.dataBytes
+        .filter(_.nonEmpty)
+        .map(_.asByteBuffer)
+        .alsoToMat(Sink.ignore)(Keep.right)
+        .toMat(Sink.asPublisher(fanout = false))(Keep.both)
+        .run()
+
+      handler.onStream(publisher)
+      complete
+    }
+
+    result.failed.foreach(handler.onError)
+    FutureConverters.asJava(result.map(_ => null: Void)).toCompletableFuture
+  }
+
+  private[awsspi] def toSdkHttpFullResponse(response: HttpResponse): 
SdkHttpFullResponse =
+    SdkHttpFullResponse
+      .builder()
+      .headers(convertToSdkResponseHeaders(response))
+      .statusCode(response.status.intValue())
+      .statusText(response.status.reason)
+      .build
+
+  private[awsspi] def convertToSdkResponseHeaders(
+      response: HttpResponse): java.util.Map[String, java.util.List[String]] = 
{
+    val responseHeaders = new java.util.HashMap[String, java.util.List[String]]
+
+    response.entity.contentType match {
+      case ContentTypes.NoContentType => ()
+      case contentType                => 
responseHeaders.put(`Content-Type`.name, 
Collections.singletonList(contentType.value))
+    }
+
+    response.entity.contentLengthOption.foreach(length =>
+      responseHeaders.put(`Content-Length`.name, 
Collections.singletonList(length.toString)))
+
+    response.headers.groupBy(_.name()).foreach { case (k, v) =>
+      val values = new java.util.ArrayList[String]
+      v.foreach(header => values.add(header.value()))
+      responseHeaders.put(k, values)
+    }
+
+    responseHeaders
+  }
+}
diff --git 
a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
 
b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
new file mode 100644
index 000000000..528b285dd
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService;
+import org.junit.Rule;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+import org.testcontainers.containers.GenericContainer;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class S3Test extends JUnitSuite {
+
+  private static final String AB = 
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+  private static SecureRandom rnd = new SecureRandom();
+
+  @Rule
+  @SuppressWarnings("rawtypes")
+  public GenericContainer<?> s3mock =
+      new GenericContainer("adobe/s3mock:2.17.0").withExposedPorts(9090);
+
+  @Test
+  public void testS3() throws Exception {
+    SdkAsyncHttpClient pekkoClient = null;
+    S3AsyncClient client = null;
+
+    try {
+      pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build();
+
+      client = getAsyncClient(pekkoClient);
+
+      createBucketAndAssert(client);
+    } finally {
+      pekkoClient.close();
+      client.close();
+    }
+  }
+
+  @Test
+  public void testS3WithExistingActorSystem() throws Exception {
+    ActorSystem system = ActorSystem.create();
+    SdkAsyncHttpClient pekkoClient = null;
+    S3AsyncClient client = null;
+
+    try {
+      pekkoClient =
+          new PekkoHttpAsyncHttpService()
+              .createAsyncHttpClientFactory()
+              .withActorSystem(system)
+              .build();
+
+      client = getAsyncClient(pekkoClient);
+
+      createBucketAndAssert(client);
+    } finally {
+      pekkoClient.close();
+      client.close();
+      system.terminate();
+      system.getWhenTerminated().toCompletableFuture().get(2, 
TimeUnit.SECONDS);
+    }
+  }
+
+  private void createBucketAndAssert(S3AsyncClient client) throws IOException {
+    
client.createBucket(CreateBucketRequest.builder().bucket("foo").build()).join();
+    File randomFile = File.createTempFile("aws1", randomString(5));
+    String fileContent = randomString(1000);
+    FileWriter fileWriter = new FileWriter(randomFile);
+    fileWriter.write(fileContent);
+    fileWriter.flush();
+    client
+        .putObject(
+            PutObjectRequest.builder()
+                .bucket("foo")
+                .key("my-file")
+                .contentType("text/plain")
+                .build(),
+            randomFile.toPath())
+        .join();
+
+    ResponseBytes<?> result =
+        client
+            .getObject(
+                
GetObjectRequest.builder().bucket("foo").key("my-file").build(),
+                AsyncResponseTransformer.toBytes())
+            .join();
+
+    assertEquals(fileContent, result.asUtf8String());
+  }
+
+  private S3AsyncClient getAsyncClient(SdkAsyncHttpClient pekkoClient) throws 
URISyntaxException {
+    return S3AsyncClient.builder()
+        .serviceConfiguration(
+            S3Configuration.builder()
+                .checksumValidationEnabled(false)
+                .pathStyleAccessEnabled(true)
+                .build())
+        .credentialsProvider(AnonymousCredentialsProvider.create())
+        .endpointOverride(new URI("http://localhost:"; + 
s3mock.getMappedPort(9090)))
+        .region(Region.of("s3"))
+        .httpClient(pekkoClient)
+        .build();
+  }
+
+  private String randomString(int len) {
+    StringBuilder sb = new StringBuilder(len);
+    for (int i = 0; i < len; i++) 
sb.append(AB.charAt(rnd.nextInt(AB.length())));
+    return sb.toString();
+  }
+}
diff --git a/aws-spi-pekko-http/src/test/resources/application.conf 
b/aws-spi-pekko-http/src/test/resources/application.conf
new file mode 100644
index 000000000..083becee2
--- /dev/null
+++ b/aws-spi-pekko-http/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.http.client.parsing.max-content-length = 150m
diff --git a/aws-spi-pekko-http/src/test/resources/logback-test.xml 
b/aws-spi-pekko-http/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..5d1d29de5
--- /dev/null
+++ b/aws-spi-pekko-http/src/test/resources/logback-test.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - 
%msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- Strictly speaking, the level attribute is not necessary since -->
+    <!-- the level of the root level is set to DEBUG by default.       -->
+    <root level="debug">
+        <appender-ref ref="STDOUT" />
+    </root>
+
+    <logger name="io.netty" level="trace" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="com.typesafe" level="error" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="org.apache.pekko.http" level="error" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <!-- Log of http content // with 
pekko.http.client.log-unencrypted-network-bytes = 10000 -->
+    <logger name="org.apache.pekko.stream" level="error" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="org.testcontainers" level="info" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="com.github.dockerjava" level="info" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="com.github.pjfanning" level="debug" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+    <logger name="software.amazon" level="error" additivity="false">
+        <appender-ref ref="STDOUT" />
+    </logger>
+</configuration>
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
new file mode 100644
index 000000000..3e558f28e
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.net.URI
+
+import org.apache.pekko
+import 
pekko.stream.connectors.awsspi.testcontainers.LocalStackReadyLogWaitStrategy
+import com.dimafeng.testcontainers.{ ForAllTestContainer, GenericContainer }
+import org.scalatest.concurrent.{ Eventually, Futures, IntegrationPatience }
+import org.scalatest.BeforeAndAfter
+import software.amazon.awssdk.core.SdkClient
+import software.amazon.awssdk.regions.Region
+
+import scala.util.Random
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+trait BaseAwsClientTest[C <: SdkClient]
+    extends AnyWordSpec
+    with Matchers
+    with Futures
+    with Eventually
+    with BeforeAndAfter
+    with IntegrationPatience
+    with ForAllTestContainer {
+
+  lazy val defaultRegion: Region = Region.EU_WEST_1
+
+  def exposedServicePort: Int
+  def container: GenericContainer
+
+  def endpoint = new 
URI(s"http://localhost:${container.mappedPort(exposedServicePort)}")
+  def randomIdentifier(length: Int): String = 
Random.alphanumeric.take(length).mkString
+}
+
+trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] 
{
+  def service: String
+
+  lazy val exposedServicePort: Int = 4566
+
+  override lazy val container: GenericContainer =
+    new GenericContainer(
+      dockerImage = "localstack/localstack",
+      exposedPorts = Seq(exposedServicePort),
+      env = Map("SERVICES" -> service),
+      waitStrategy = Some(LocalStackReadyLogWaitStrategy))
+}
+
+trait ElasticMQSQSBaseAwsClientTest[C <: SdkClient] extends 
BaseAwsClientTest[C] {
+  def service: String
+
+  lazy val exposedServicePort: Int = 9324
+
+  override lazy val container: GenericContainer =
+    new GenericContainer(
+      dockerImage = "softwaremill/elasticmq-native",
+      exposedPorts = Seq(exposedServicePort))
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
new file mode 100644
index 000000000..6046aa1b6
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.Collections
+
+import org.apache.pekko
+import pekko.http.scaladsl.model.headers.`Content-Type`
+import pekko.http.scaladsl.model.MediaTypes
+import org.scalatest.OptionValues
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
+
+  "PekkoHttpClient" should {
+
+    "parse custom content type" in {
+      val contentTypeStr = "application/xml"
+      val contentType = 
PekkoHttpClient.tryCreateCustomContentType(contentTypeStr)
+      contentType.mediaType should be(MediaTypes.`application/xml`)
+    }
+
+    "remove 'ContentType' return 'ContentLength' separate from sdk headers" in 
{
+      val headers = new java.util.HashMap[String, java.util.List[String]]
+      headers.put("Content-Type", Collections.singletonList("application/xml"))
+      headers.put("Content-Length", Collections.singletonList("123"))
+      headers.put("Accept", Collections.singletonList("*/*"))
+
+      val (contentTypeHeader, reqHeaders) = 
PekkoHttpClient.convertHeaders(headers)
+
+      contentTypeHeader.value.lowercaseName() shouldBe 
`Content-Type`.lowercaseName
+      reqHeaders should have size 1
+    }
+  }
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
new file mode 100644
index 000000000..b899f3efd
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.headers.`User-Agent`
+import pekko.http.scaladsl.model.{ HttpEntity, HttpResponse }
+import org.reactivestreams.{ Publisher, Subscriber, Subscription }
+import org.scalatest.OptionValues
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+import software.amazon.awssdk.http.SdkHttpResponse
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler
+
+import scala.concurrent.Future
+
+class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues {
+  "Check headers are present from response" in {
+    implicit val system = ActorSystem("test")
+    implicit val ec = system.dispatcher
+    val response = HttpResponse(entity = HttpEntity("Ok"), headers = 
`User-Agent`("Mozilla") :: Nil)
+    val runner = new RequestRunner()
+    val handler = new MyHeaderHandler()
+    val resp = runner.run(() => Future.successful(response), handler)
+    resp.join()
+
+    handler.responseHeaders.headers().get("User-Agent").get(0) shouldBe 
"Mozilla"
+    handler.responseHeaders.headers().get("Content-Type").get(0) shouldBe 
"text/plain; charset=UTF-8"
+    handler.responseHeaders.headers().get("Content-Length").get(0) shouldBe "2"
+  }
+
+  class MyHeaderHandler() extends SdkAsyncHttpResponseHandler {
+    private val headers = new AtomicReference[SdkHttpResponse](null)
+    def responseHeaders = headers.get()
+    override def onHeaders(headers: SdkHttpResponse): Unit = 
this.headers.set(headers)
+    override def onStream(stream: Publisher[ByteBuffer]): Unit = 
stream.subscribe(new Subscriber[ByteBuffer] {
+      override def onSubscribe(s: Subscription): Unit = s.request(1000)
+      override def onNext(t: ByteBuffer): Unit = ()
+      override def onError(t: Throwable): Unit = ()
+      override def onComplete(): Unit = ()
+    })
+    override def onError(error: Throwable): Unit = ()
+  }
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
new file mode 100644
index 000000000..42c758286
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.dynamodb
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ LocalstackBaseAwsClientTest, 
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
+import software.amazon.awssdk.services.dynamodb.model._
+
+class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] {
+  "DynamoDB" should {
+    "create a table" in withClient { implicit client =>
+      val attributes = 
AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build()
+      val keySchema = 
KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build()
+
+      val emptyTableResult = client.listTables().join()
+      emptyTableResult.tableNames() should have size 0
+
+      val result = client
+        .createTable(
+          CreateTableRequest
+            .builder()
+            .tableName("Movies")
+            .attributeDefinitions(attributes)
+            .keySchema(keySchema)
+            .provisionedThroughput(
+              
ProvisionedThroughput.builder.readCapacityUnits(1000L).writeCapacityUnits(1000L).build())
+            .build())
+        .join
+
+      val desc = result.tableDescription()
+      desc.tableName() should be("Movies")
+
+      val tableResult = client.listTables().join()
+      tableResult.tableNames() should have size 1
+    }
+
+  }
+
+  def withClient(testCode: DynamoDbAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = DynamoDbAsyncClient
+      .builder()
+      .endpointOverride(endpoint)
+      
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
 "x")))
+      .httpClient(pekkoClient)
+      .region(defaultRegion)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  override val service: String = "dynamodb"
+
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
new file mode 100644
index 000000000..94771ab47
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3
+
+import java.io.{ File, FileWriter }
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ BaseAwsClientTest, 
PekkoHttpAsyncHttpService }
+import pekko.stream.connectors.awsspi.testcontainers.TimeoutWaitStrategy
+import com.dimafeng.testcontainers.GenericContainer
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
+import software.amazon.awssdk.core.async.{ AsyncRequestBody, 
AsyncResponseTransformer }
+import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3Configuration }
+import software.amazon.awssdk.services.s3.model._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
+
+  "Async S3 client" should {
+    "create bucket" in withClient { implicit client =>
+      val bucketName = createBucket()
+      val buckets = client.listBuckets().join
+      buckets.buckets() should have size 1
+      buckets.buckets().get(0).name() should be(bucketName)
+    }
+
+    "upload and download a file to a bucket" in withClient { implicit client =>
+      val bucketName = createBucket()
+      val fileContent = 0 to 1000 mkString
+
+      client
+        
.putObject(PutObjectRequest.builder().bucket(bucketName).key("my-file").contentType("text/plain").build(),
+          AsyncRequestBody.fromString(fileContent))
+        .join
+
+      val result = client
+        
.getObject(GetObjectRequest.builder().bucket(bucketName).key("my-file").build(),
+          AsyncResponseTransformer.toBytes[GetObjectResponse]())
+        .join
+
+      result.asUtf8String() should be(fileContent)
+      result.response().contentType() should be("text/plain")
+      result.response().contentLength() shouldEqual 
fileContent.getBytes().length
+    }
+
+    "multipart upload" in withClient { implicit client =>
+      val bucketName = createBucket()
+      val randomFile = File.createTempFile("aws1", 
Random.alphanumeric.take(5).mkString)
+      val fileContent = (0 to 1000000).mkString
+      val fileWriter = new FileWriter(randomFile)
+      fileWriter.write(fileContent)
+      fileWriter.flush()
+      val createMultipartUploadResponse = client
+        .createMultipartUpload(
+          
CreateMultipartUploadRequest.builder().bucket(bucketName).key("bar").contentType("text/plain").build())
+        .join()
+
+      val p1 = client
+        .uploadPart(UploadPartRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .partNumber(1)
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .build(),
+          randomFile.toPath)
+        .join
+      val p2 = client
+        .uploadPart(UploadPartRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .partNumber(2)
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .build(),
+          randomFile.toPath)
+        .join
+
+      client
+        .completeMultipartUpload(
+          CompleteMultipartUploadRequest
+            .builder()
+            .bucket(bucketName)
+            .key("bar")
+            .multipartUpload(
+              CompletedMultipartUpload
+                .builder()
+                
.parts(CompletedPart.builder().partNumber(1).eTag(p1.eTag()).build(),
+                  
CompletedPart.builder().partNumber(2).eTag(p2.eTag()).build())
+                .build())
+            .uploadId(createMultipartUploadResponse.uploadId())
+            .build())
+        .join
+
+      val result = client
+        
.getObject(GetObjectRequest.builder().bucket(bucketName).key("bar").build(),
+          AsyncResponseTransformer.toBytes[GetObjectResponse]())
+        .join
+      result.asUtf8String() should be(fileContent + fileContent)
+    }
+
+    "upload and head request gzip file with 0 content-length" in withClient { 
implicit client =>
+      val bucketName = createBucket()
+      val fileName = "my-empty-file"
+      val contentLength = 0L
+      val contentType = "application/json"
+      val contentEncoding = "gzip"
+
+      client
+        .putObject(
+          PutObjectRequest
+            .builder()
+            .bucket(bucketName)
+            .key(fileName)
+            .contentType(contentType)
+            .contentLength(contentLength)
+            .contentEncoding(contentEncoding)
+            .build(),
+          AsyncRequestBody.fromString(""))
+        .join
+
+      val result = client
+        
.headObject(HeadObjectRequest.builder().bucket(bucketName).key(fileName).build())
+        .join
+
+      result.contentLength() should be(contentLength)
+      result.contentType() should be(contentType)
+      result.contentEncoding() should be(contentEncoding)
+    }
+
+  }
+
+  def createBucket()(implicit client: S3AsyncClient): String = {
+    val bucketName = Random.alphanumeric.take(7).map(_.toLower).mkString
+    
client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()).join
+    bucketName
+  }
+
+  private def withClient(testCode: S3AsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = S3AsyncClient
+      .builder()
+      .serviceConfiguration(
+        S3Configuration
+          .builder()
+          .checksumValidationEnabled(false)
+          .pathStyleAccessEnabled(true)
+          .build())
+      .credentialsProvider(AnonymousCredentialsProvider.create)
+      .endpointOverride(endpoint)
+      .httpClient(pekkoClient)
+      .region(defaultRegion)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  override def exposedServicePort: Int = 9090
+
+  override lazy val container: GenericContainer = new GenericContainer(
+    dockerImage = "adobe/s3mock:2.17.0",
+    exposedPorts = Seq(exposedServicePort),
+    waitStrategy = Some(TimeoutWaitStrategy(10 seconds)))
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
new file mode 100644
index 000000000..d73fcb849
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sns
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ LocalstackBaseAwsClientTest, 
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
+import software.amazon.awssdk.services.sns.SnsAsyncClient
+import software.amazon.awssdk.services.sns.model.{ CreateTopicRequest, 
PublishRequest }
+
+class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {
+
+  "Async SNS client" should {
+    "publish a message to a topic" in withClient { implicit client =>
+      val arn = 
client.createTopic(CreateTopicRequest.builder().name("topic-example").build()).join().topicArn()
+      val result = client.publish(PublishRequest.builder().message("a 
message").topicArn(arn).build()).join()
+
+      result.messageId() should not be null
+    }
+  }
+
+  def withClient(testCode: SnsAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = SnsAsyncClient
+      .builder()
+      
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
 "x")))
+      .httpClient(pekkoClient)
+      .region(defaultRegion)
+      .endpointOverride(endpoint)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  override def service: String = "sns"
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
new file mode 100644
index 000000000..ee2b92123
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sqs
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ ElasticMQSQSBaseAwsClientTest, 
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.model._
+
+class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] {
+  "Async SQS client" should {
+
+    "publish a message to a queue" in withClient { implicit client =>
+      
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
+      client
+        
.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build())
+        .join()
+      val receivedMessage = client
+        
.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build())
+        .join()
+      receivedMessage.messages().get(0).body() should be("123")
+    }
+
+    "delete a message" in withClient { implicit client =>
+      
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
+      client
+        
.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build())
+        .join()
+
+      val receivedMessages = client
+        
.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build())
+        .join
+
+      client
+        .deleteMessage(
+          DeleteMessageRequest
+            .builder()
+            .queueUrl(s"$endpoint/queue/foo")
+            .receiptHandle(receivedMessages.messages().get(0).receiptHandle())
+            .build())
+        .join()
+
+      val receivedMessage = client
+        .receiveMessage(
+          ReceiveMessageRequest
+            .builder()
+            .queueUrl(s"$endpoint/queue/foo")
+            .maxNumberOfMessages(1)
+            .waitTimeSeconds(1)
+            .build())
+        .join()
+      receivedMessage.messages() shouldBe java.util.Collections.EMPTY_LIST
+    }
+
+  }
+
+  def withClient(testCode: SqsAsyncClient => Any): Any = {
+
+    val pekkoClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+    val client = SqsAsyncClient
+      .builder()
+      
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
 "x")))
+      .httpClient(pekkoClient)
+      .region(defaultRegion)
+      .endpointOverride(endpoint)
+      .build()
+
+    try
+      testCode(client)
+    finally { // clean up
+      pekkoClient.close()
+      client.close()
+    }
+  }
+
+  override def service: String = "sqs"
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
new file mode 100644
index 000000000..07ed0490c
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.testcontainers
+
+import java.util.concurrent.{ TimeUnit, TimeoutException }
+import java.util.function.Predicate
+
+import org.testcontainers.DockerClientFactory
+import org.testcontainers.containers.ContainerLaunchException
+import org.testcontainers.containers.output.{ OutputFrame, WaitingConsumer }
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy
+import org.testcontainers.utility.LogUtils
+
+/**
+ * This strategy is based on the container log "Ready." from Localstack. Once 
it's printed out, the container is good
+ * to go.
+ */
+object LocalStackReadyLogWaitStrategy extends AbstractWaitStrategy {
+  override def waitUntilReady(): Unit = {
+    val waitingConsumer = new WaitingConsumer
+    LogUtils.followOutput(DockerClientFactory.instance.client, 
waitStrategyTarget.getContainerId, waitingConsumer)
+
+    val waitPredicate: Predicate[OutputFrame] = (outputFrame: OutputFrame) =>
+      outputFrame.getUtf8String.contains("Ready.")
+
+    try
+      waitingConsumer.waitUntil(waitPredicate, startupTimeout.getSeconds, 
TimeUnit.SECONDS, 1)
+    catch {
+      case _: TimeoutException =>
+        throw new ContainerLaunchException("Timed out waiting for localstack")
+    }
+  }
+}
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
new file mode 100644
index 000000000..62f206043
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.testcontainers
+
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy
+
+import scala.concurrent.duration.FiniteDuration
+
+object TimeoutWaitStrategy {
+  def apply(timeout: FiniteDuration): TimeoutWaitStrategy = new 
TimeoutWaitStrategy(timeout)
+}
+
+class TimeoutWaitStrategy(timeout: FiniteDuration) extends 
AbstractWaitStrategy {
+  override def waitUntilReady(): Unit = Thread.sleep(timeout.toMillis)
+}
diff --git a/awslambda/src/test/java/docs/javadsl/Examples.java 
b/awslambda/src/test/java/docs/javadsl/Examples.java
index 5b0427b7b..5711219d8 100644
--- a/awslambda/src/test/java/docs/javadsl/Examples.java
+++ b/awslambda/src/test/java/docs/javadsl/Examples.java
@@ -22,7 +22,7 @@ import org.apache.pekko.stream.javadsl.Flow;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
 // #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
diff --git a/awslambda/src/test/scala/docs/scaladsl/Examples.scala 
b/awslambda/src/test/scala/docs/scaladsl/Examples.scala
index 5ae45c9eb..26dfff603 100644
--- a/awslambda/src/test/scala/docs/scaladsl/Examples.scala
+++ b/awslambda/src/test/scala/docs/scaladsl/Examples.scala
@@ -28,7 +28,7 @@ object Examples {
 
   def initClient(): Unit = {
     // #init-client
-    import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+    import pekko.stream.connectors.awsspi.PekkoHttpClient
     import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
     import software.amazon.awssdk.services.lambda.LambdaAsyncClient
 
diff --git a/build.sbt b/build.sbt
index 8986071b8..956965398 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,6 +18,7 @@ ThisBuild / reproducibleBuildsCheckResolver := 
Resolver.ApacheMavenStagingRepo
 lazy val userProjects: Seq[ProjectReference] = List[ProjectReference](
   amqp,
   avroparquet,
+  awsSpiPekkoHttp,
   awslambda,
   azureStorageQueue,
   cassandra,
@@ -128,7 +129,13 @@ lazy val amqp = pekkoConnectorProject("amqp", "amqp", 
Dependencies.Amqp)
 lazy val avroparquet =
   pekkoConnectorProject("avroparquet", "avroparquet", Dependencies.AvroParquet)
 
+lazy val awsSpiPekkoHttp =
+  pekkoConnectorProject("aws-spi-pekko-http", "aws.api.pekko.http", 
Dependencies.AwsSpiPekkoHttp)
+    .configs(IntegrationTest)
+    .settings(Defaults.itSettings)
+
 lazy val awslambda = pekkoConnectorProject("awslambda", "aws.lambda", 
Dependencies.AwsLambda)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val azureStorageQueue = pekkoConnectorProject(
   "azure-storage-queue",
@@ -151,6 +158,7 @@ lazy val csvBench = internalProject("csv-bench")
   .enablePlugins(JmhPlugin)
 
 lazy val dynamodb = pekkoConnectorProject("dynamodb", "aws.dynamodb", 
Dependencies.DynamoDB)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val elasticsearch = pekkoConnectorProject(
   "elasticsearch",
@@ -278,6 +286,7 @@ lazy val jms = pekkoConnectorProject("jms", "jms", 
Dependencies.Jms)
 lazy val jsonStreaming = pekkoConnectorProject("json-streaming", 
"json.streaming", Dependencies.JsonStreaming)
 
 lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", 
Dependencies.Kinesis)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val kudu = pekkoConnectorProject("kudu", "kudu", Dependencies.Kudu)
 
@@ -307,6 +316,7 @@ lazy val reference = internalProject("reference", 
Dependencies.Reference)
 
 lazy val s3 = pekkoConnectorProject("s3", "aws.s3", Dependencies.S3,
   MetaInfLicenseNoticeCopy.s3Settings)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val pravega = pekkoConnectorProject(
   "pravega",
@@ -323,16 +333,19 @@ lazy val simpleCodecs = 
pekkoConnectorProject("simple-codecs", "simplecodecs")
 
 lazy val slick = pekkoConnectorProject("slick", "slick", Dependencies.Slick)
 
-lazy val eventbridge =
-  pekkoConnectorProject("aws-event-bridge", "aws.eventbridge", 
Dependencies.Eventbridge)
+lazy val eventbridge = pekkoConnectorProject("aws-event-bridge", 
"aws.eventbridge",
+  Dependencies.Eventbridge)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val sns = pekkoConnectorProject("sns", "aws.sns", Dependencies.Sns)
+  .dependsOn(awsSpiPekkoHttp)
 
 // Solrj has some deprecated methods
 lazy val solr = pekkoConnectorProject("solr", "solr", Dependencies.Solr,
   fatalWarnings := false)
 
 lazy val sqs = pekkoConnectorProject("sqs", "aws.sqs", Dependencies.Sqs)
+  .dependsOn(awsSpiPekkoHttp)
 
 lazy val sse = pekkoConnectorProject("sse", "sse", Dependencies.Sse)
 
@@ -466,7 +479,7 @@ def pekkoConnectorProject(projectId: String,
       licenses := List(License.Apache2),
       AutomaticModuleName.settings(s"pekko.stream.connectors.$moduleName"),
       mimaPreviousArtifacts := {
-        if (moduleName == "slick" || moduleName == "couchbase3") {
+        if (moduleName == "slick" || moduleName == "couchbase3" || moduleName 
== "aws.api.pekko.http") {
           Set.empty
         } else {
           Set(organization.value %% name.value % mimaCompareVersion)
diff --git a/dynamodb/src/test/java/docs/javadsl/ExampleTest.java 
b/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
index 261e31d73..138a376cc 100644
--- a/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
+++ b/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
@@ -29,7 +29,7 @@ import org.apache.pekko.stream.javadsl.SourceWithContext;
 import org.apache.pekko.testkit.javadsl.TestKit;
 import org.junit.*;
 // #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import scala.util.Try;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
diff --git a/dynamodb/src/test/java/docs/javadsl/RetryTest.java 
b/dynamodb/src/test/java/docs/javadsl/RetryTest.java
index 0a325c729..a1ee03076 100644
--- a/dynamodb/src/test/java/docs/javadsl/RetryTest.java
+++ b/dynamodb/src/test/java/docs/javadsl/RetryTest.java
@@ -14,9 +14,9 @@
 package docs.javadsl;
 
 import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
 import org.apache.pekko.testkit.javadsl.TestKit;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import org.junit.Rule;
 import org.junit.Test;
 // #clientRetryConfig
diff --git a/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala 
b/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
index 475fc019c..ab7b2632a 100644
--- a/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
+++ b/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
@@ -32,7 +32,7 @@ import pekko.testkit.TestKit
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.BeforeAndAfterAll
 //#init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
 import software.amazon.awssdk.regions.Region
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
diff --git a/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala 
b/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
index b0dfe3485..8766a8ece 100644
--- a/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
+++ b/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
@@ -15,9 +15,9 @@ package docs.scaladsl
 
 import org.apache.pekko
 import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.testkit.TestKit
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
 import org.scalatest.BeforeAndAfterAll
 import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
 // #awsRetryConfiguration
diff --git 
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
 
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
index dd8241d18..26cf9b647 100644
--- 
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
+++ 
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
@@ -16,11 +16,11 @@ package org.apache.pekko.stream.connectors.dynamodb
 import java.net.URI
 import org.apache.pekko
 import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import pekko.stream.connectors.dynamodb.scaladsl._
 import pekko.stream.scaladsl.Sink
 import pekko.testkit.TestKit
 import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
 import org.scalatest._
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AsyncWordSpecLike
diff --git 
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
 
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
index b2197d6e9..372964730 100644
--- 
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
+++ 
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
@@ -17,10 +17,10 @@ import java.net.URI
 
 import org.apache.pekko
 import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import pekko.stream.connectors.dynamodb.scaladsl.DynamoDb
 import pekko.testkit.TestKit
 import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
 import org.scalatest.BeforeAndAfterAll
 import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
 import software.amazon.awssdk.regions.Region
diff --git a/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java 
b/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
index 322b2318d..31254890c 100644
--- a/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
@@ -21,7 +21,7 @@ import 
org.apache.pekko.stream.connectors.kinesisfirehose.javadsl.KinesisFirehos
 import org.apache.pekko.stream.javadsl.Flow;
 import org.apache.pekko.stream.javadsl.Sink;
 // #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
 // #init-client
 import 
software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
diff --git a/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java 
b/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
index f24d04292..d821b8b8b 100644
--- a/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
@@ -15,6 +15,7 @@ package docs.javadsl;
 
 import org.apache.pekko.NotUsed;
 import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.connectors.kinesis.KinesisFlowSettings;
 import org.apache.pekko.stream.connectors.kinesis.ShardIterators;
 import org.apache.pekko.stream.connectors.kinesis.ShardSettings;
@@ -26,7 +27,6 @@ import org.apache.pekko.stream.javadsl.FlowWithContext;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
 // #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 // #init-client
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
diff --git a/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala 
b/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
index acf0851d1..df9568df6 100644
--- a/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
+++ b/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
@@ -24,7 +24,7 @@ import software.amazon.awssdk.services.firehose.model.{ 
PutRecordBatchResponseEn
 object KinesisFirehoseSnippets {
 
   // #init-client
-  import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+  import pekko.stream.connectors.awsspi.PekkoHttpClient
   import software.amazon.awssdk.services.firehose.FirehoseAsyncClient
 
   implicit val system: ActorSystem = ActorSystem()
diff --git a/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala 
b/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
index 33c037586..696342c44 100644
--- a/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
+++ b/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
@@ -29,7 +29,7 @@ import scala.concurrent.duration._
 object KinesisSnippets {
 
   // #init-client
-  import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+  import pekko.stream.connectors.awsspi.PekkoHttpClient
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
 
   implicit val system: ActorSystem = ActorSystem()
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index b33c35ada..30b7bab3a 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -57,7 +57,7 @@ object Dependencies {
 
   val scalaTestScalaCheckArtifact = 
s"scalacheck-${scalaTestPlusScalaCheckVersion(scalaCheckVersion)}"
   val scalaTestScalaCheckVersion = s"$ScalaTestVersion.0"
-  val scalaTestMockitoVersion = "3.2.18.0" // 
https://github.com/scalatest/scalatest/issues/2311
+  val scalaTestMockitoVersion = scalaTestScalaCheckVersion
 
   val CouchbaseVersion = "2.7.23"
   val Couchbase3Version = "3.6.0"
@@ -127,11 +127,28 @@ object Dependencies {
     libraryDependencies ++= Seq(
       "com.rabbitmq" % "amqp-client" % "5.21.0") ++ Mockito)
 
+  val AwsSpiPekkoHttp = Seq(
+    libraryDependencies ++= Seq(
+      "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
+      "software.amazon.awssdk" % "http-client-spi" % AwsSdk2Version,
+      ("software.amazon.awssdk" % "dynamodb" % AwsSdk2Version % 
"it,test").excludeAll(
+        ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+      ("software.amazon.awssdk" % "kinesis" % AwsSdk2Version % 
"it,test").excludeAll(
+        ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+      ("software.amazon.awssdk" % "sns" % AwsSdk2Version % 
"it,test").excludeAll(
+        ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+      ("software.amazon.awssdk" % "sqs" % AwsSdk2Version % 
"it,test").excludeAll(
+        ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+      ("software.amazon.awssdk" % "s3" % AwsSdk2Version % 
"it,test").excludeAll(
+        ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+      "com.dimafeng" %% "testcontainers-scala" % 
TestContainersScalaTestVersion % Test,
+      "org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test",
+      "org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion % 
"it,test",
+      "ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test"))
+
   val AwsLambda = Seq(
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(
-        ExclusionRule(organization = "org.apache.pekko")),
       ("software.amazon.awssdk" % "lambda" % AwsSdk2Version).excludeAll(
         ExclusionRule("software.amazon.awssdk", "apache-client"),
         ExclusionRule("software.amazon.awssdk", "netty-nio-client"))) ++ 
Mockito)
@@ -177,8 +194,6 @@ object Dependencies {
 
   val DynamoDB = Seq(
     libraryDependencies ++= Seq(
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(
-        ExclusionRule(organization = "org.apache.pekko")),
       ("software.amazon.awssdk" % "dynamodb" % AwsSdk2Version).excludeAll(
         ExclusionRule("software.amazon.awssdk", "apache-client"),
         ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -363,8 +378,6 @@ object Dependencies {
   val Kinesis = Seq(
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(ExclusionRule(
-        organization = "org.apache.pekko"))) ++ Seq(
       "software.amazon.awssdk" % "kinesis" % AwsSdk2Version,
       "software.amazon.awssdk" % "firehose" % AwsSdk2Version,
       "software.amazon.kinesis" % "amazon-kinesis-client" % "2.6.0").map(
@@ -452,8 +465,6 @@ object Dependencies {
 
   val Eventbridge = Seq(
     libraryDependencies ++= Seq(
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(
-        ExclusionRule(organization = "org.apache.pekko")),
       ("software.amazon.awssdk" % "eventbridge" % AwsSdk2Version).excludeAll(
         ExclusionRule("software.amazon.awssdk", "apache-client"),
         ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -461,8 +472,6 @@ object Dependencies {
 
   val Sns = Seq(
     libraryDependencies ++= Seq(
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(
-        ExclusionRule(organization = "org.apache.pekko")),
       ("software.amazon.awssdk" % "sns" % AwsSdk2Version).excludeAll(
         ExclusionRule("software.amazon.awssdk", "apache-client"),
         ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -483,8 +492,6 @@ object Dependencies {
 
   val Sqs = Seq(
     libraryDependencies ++= Seq(
-      ("com.github.pjfanning" %% "aws-spi-pekko-http" % 
AwsSpiPekkoHttpVersion).excludeAll(
-        ExclusionRule(organization = "org.apache.pekko")),
       ("software.amazon.awssdk" % "sqs" % AwsSdk2Version).excludeAll(
         ExclusionRule("software.amazon.awssdk", "apache-client"),
         ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
diff --git a/sns/src/test/java/docs/javadsl/SnsPublisherTest.java 
b/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
index b05d9fe46..bcf32de28 100644
--- a/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
+++ b/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
@@ -16,6 +16,7 @@ package docs.javadsl;
 import org.apache.pekko.Done;
 import org.apache.pekko.actor.ActorSystem;
 import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.connectors.sns.javadsl.SnsPublisher;
 import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
 import org.apache.pekko.stream.javadsl.Sink;
@@ -24,7 +25,6 @@ import org.apache.pekko.testkit.javadsl.TestKit;
 
 // #init-client
 import java.net.URI;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
diff --git 
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
 
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
index e478c016f..0cf134696 100644
--- 
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
+++ 
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
@@ -63,7 +63,7 @@ trait IntegrationTestContext extends BeforeAndAfterAll with 
ScalaFutures {
     // #init-client
     import java.net.URI
 
-    import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+    import pekko.stream.connectors.awsspi.PekkoHttpClient
     import software.amazon.awssdk.services.sns.SnsAsyncClient
     import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
     import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
diff --git a/sqs/src/test/java/docs/javadsl/SqsSourceTest.java 
b/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
index 5bdfee652..4b42416fb 100644
--- a/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
+++ b/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
@@ -14,6 +14,7 @@
 package docs.javadsl;
 
 import org.apache.pekko.Done;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.connectors.sqs.MessageAttributeName;
 import org.apache.pekko.stream.connectors.sqs.MessageSystemAttributeName;
 import org.apache.pekko.stream.connectors.sqs.SqsPublishBatchSettings;
@@ -23,7 +24,6 @@ import 
org.apache.pekko.stream.connectors.sqs.javadsl.SqsPublishSink;
 import org.apache.pekko.stream.connectors.sqs.javadsl.SqsSource;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import org.junit.Test;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
diff --git 
a/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
 
b/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
index 709fa5b2e..a2540de70 100644
--- 
a/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
+++ 
b/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
@@ -15,10 +15,10 @@ package org.apache.pekko.stream.connectors.sqs.javadsl;
 
 import org.apache.pekko.actor.ActorSystem;
 import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
 import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
 import org.apache.pekko.testkit.javadsl.TestKit;
 // #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
diff --git a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala 
b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
index 6b2963dfe..699eb6d86 100644
--- a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
+++ b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
@@ -19,12 +19,12 @@ import java.util.concurrent.TimeUnit
 import org.apache.pekko
 import pekko.Done
 import pekko.stream.KillSwitches
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import pekko.stream.connectors.sqs._
 import pekko.stream.connectors.sqs.scaladsl.{ DefaultTestContext, SqsSource }
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.scaladsl.{ Keep, Sink }
 import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.matchers.should.Matchers
diff --git 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
index cb5446494..661302579 100644
--- 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
+++ 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit
 import org.apache.pekko
 import pekko.actor.{ ActorSystem, Terminated }
 import pekko.http.scaladsl.Http
+import pekko.stream.connectors.awsspi.PekkoHttpClient
 import pekko.stream.connectors.sqs.SqsSourceSettings
 import pekko.util.ccompat.JavaConverters._
 import org.scalatest.concurrent.ScalaFutures
@@ -27,7 +28,6 @@ import org.scalatest.{ BeforeAndAfterAll, Suite, Tag }
 
 import scala.concurrent.ExecutionContext
 //#init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
 import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, 
StaticCredentialsProvider }
 import software.amazon.awssdk.regions.Region
 import software.amazon.awssdk.services.sqs.SqsAsyncClient


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to