This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new ed7ac3647 Move AWS SPI code into Pekko repo (#648)
ed7ac3647 is described below
commit ed7ac36475745d32204df403252ee6d576daf1bd
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Jun 12 17:56:03 2024 +0100
Move AWS SPI code into Pekko repo (#648)
* move aws-spi to pekko-connectors
Co-authored-by: Matthias Lüneberg <[email protected]>
* rename packages
import order
scalafmt
Update KinesisFirehoseSnippets.java
imports
* compile warnings
* try to suppress warning
* exclude aws-spi-pekko-http from mima check
* scalafmt
* Update build.sbt
* add build job
* compile issues
* Update application.conf
* fix module name of awslambda
* rename files
---------
Co-authored-by: Matthias Lüneberg <[email protected]>
---
.github/workflows/check-build-test.yml | 4 +
NOTICE | 7 +
.../docs/javadsl/EventBridgePublisherTest.java | 2 +-
.../aws/eventbridge/IntegrationTestContext.scala | 2 +-
.../src/it/resources/application.conf | 3 +
.../pekko/stream/connectors/awsspi/TestBase.scala | 47 +++++
.../awsspi/dynamodb/DynamoDBITTest.scala | 91 ++++++++++
.../connectors/awsspi/kinesis/KinesisITTest.scala | 111 ++++++++++++
.../stream/connectors/awsspi/s3/S3ITTest.scala | 137 ++++++++++++++
.../stream/connectors/awsspi/sqs/SQSITTest.scala | 75 ++++++++
...re.amazon.awssdk.http.async.SdkAsyncHttpService | 1 +
.../awsspi/PekkoHttpAsyncHttpService.scala | 25 +++
.../stream/connectors/awsspi/PekkoHttpClient.scala | 198 +++++++++++++++++++++
.../stream/connectors/awsspi/RequestRunner.scala | 87 +++++++++
.../pekko/stream/connectors/awsspi/S3Test.java | 144 +++++++++++++++
.../src/test/resources/application.conf | 3 +
.../src/test/resources/logback-test.xml | 43 +++++
.../connectors/awsspi/BaseAwsClientTest.scala | 74 ++++++++
.../connectors/awsspi/PekkoHttpClientSpec.scala | 51 ++++++
.../connectors/awsspi/RequestRunnerSpec.scala | 63 +++++++
.../connectors/awsspi/dynamodb/TestDynamoDB.scala | 78 ++++++++
.../pekko/stream/connectors/awsspi/s3/TestS3.scala | 188 +++++++++++++++++++
.../stream/connectors/awsspi/sns/TestSNS.scala | 58 ++++++
.../stream/connectors/awsspi/sqs/TestSQS.scala | 94 ++++++++++
.../LocalStackReadyLogWaitStrategy.scala | 48 +++++
.../testcontainers/TimeoutWaitStrategy.scala | 30 ++++
awslambda/src/test/java/docs/javadsl/Examples.java | 2 +-
.../src/test/scala/docs/scaladsl/Examples.scala | 2 +-
build.sbt | 19 +-
.../src/test/java/docs/javadsl/ExampleTest.java | 2 +-
dynamodb/src/test/java/docs/javadsl/RetryTest.java | 2 +-
.../src/test/scala/docs/scaladsl/ExampleSpec.scala | 2 +-
.../src/test/scala/docs/scaladsl/RetrySpec.scala | 2 +-
.../stream/connectors/dynamodb/ItemSpec.scala | 2 +-
.../stream/connectors/dynamodb/TableSpec.scala | 2 +-
.../java/docs/javadsl/KinesisFirehoseSnippets.java | 2 +-
.../test/java/docs/javadsl/KinesisSnippets.java | 2 +-
.../docs/scaladsl/KinesisFirehoseSnippets.scala | 2 +-
.../test/scala/docs/scaladsl/KinesisSnippets.scala | 2 +-
project/Dependencies.scala | 33 ++--
.../test/java/docs/javadsl/SnsPublisherTest.java | 2 +-
.../connectors/sns/IntegrationTestContext.scala | 2 +-
sqs/src/test/java/docs/javadsl/SqsSourceTest.java | 2 +-
.../stream/connectors/sqs/javadsl/BaseSqsTest.java | 2 +-
.../test/scala/docs/scaladsl/SqsSourceSpec.scala | 2 +-
.../sqs/scaladsl/DefaultTestContext.scala | 2 +-
46 files changed, 1716 insertions(+), 36 deletions(-)
diff --git a/.github/workflows/check-build-test.yml
b/.github/workflows/check-build-test.yml
index 96d9b7bb5..5b153a643 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -40,6 +40,9 @@ jobs:
- name: "Code style, compile tests, MiMa. Run locally with: sbt
\"javafmtCheckAll; +Test/compile; +mimaReportBinaryIssues\""
run: sbt "javafmtCheckAll; +Test/compile; +mimaReportBinaryIssues"
+ - name: "Integration Test Compile"
+ run: sbt "+IntegrationTest/compile"
+
documentation:
name: ScalaDoc, Documentation with Paradox
runs-on: ubuntu-20.04
@@ -78,6 +81,7 @@ jobs:
include:
- { connector: amqp, pre_cmd:
'docker-compose up -d amqp' }
- { connector: avroparquet }
+ - { connector: aws-spi-pekko-http }
- { connector: awslambda }
- { connector: aws-event-bridge, pre_cmd:
'docker-compose up -d amazoneventbridge' }
- { connector: azure-storage-queue }
diff --git a/NOTICE b/NOTICE
index fe0645e02..0794f4e7c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -12,6 +12,13 @@ Apache License, Version 2.0 License.
---------------
+pekko-connectors-aws-spi-pekko-http is based on aws-spi-akka-http
+<https://github.com/matsluni/aws-spi-akka-http>.
+This code was released under an Apache 2.0 license and a Software Grant
Agreement
+has been registered with the ASF secretary.
+
+---------------
+
pekko-connectors-ftp contains code from Apache Commons Net
<https://commons.apache.org/proper/commons-net/>.
This code was released under an Apache 2.0 license.
diff --git
a/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
b/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
index cc13e08d7..47e303b02 100644
--- a/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
+++ b/aws-event-bridge/src/test/java/docs/javadsl/EventBridgePublisherTest.java
@@ -18,13 +18,13 @@ import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
// #init-system
import
org.apache.pekko.stream.connectors.aws.eventbridge.javadsl.EventBridgePublisher;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.testkit.javadsl.TestKit;
// #init-client
import java.net.URI;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
diff --git
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
index 43cd1ff88..338554ea2 100644
---
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
+++
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/IntegrationTestContext.scala
@@ -55,7 +55,7 @@ trait IntegrationTestContext extends BeforeAndAfterAll with
ScalaFutures {
// #init-client
import java.net.URI
- import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+ import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
diff --git a/aws-spi-pekko-http/src/it/resources/application.conf
b/aws-spi-pekko-http/src/it/resources/application.conf
new file mode 100644
index 000000000..8a2cb9651
--- /dev/null
+++ b/aws-spi-pekko-http/src/it/resources/application.conf
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.http.client.parsing.max-content-length = 15m
diff --git
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
new file mode 100644
index 000000000..d0494ad9a
--- /dev/null
+++
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import software.amazon.awssdk.auth.credentials.{
+ AwsCredentialsProviderChain,
+ EnvironmentVariableCredentialsProvider,
+ ProfileCredentialsProvider,
+ SystemPropertyCredentialsProvider
+}
+import software.amazon.awssdk.regions.Region
+
+import scala.util.Random
+
+trait TestBase {
+
+ private lazy val credentialProfileName = "spi-test-account"
+
+ lazy val defaultRegion = Region.EU_WEST_1
+
+ def randomIdentifier(length: Int): String =
Random.alphanumeric.take(length).mkString
+
+ lazy val credentialProviderChain =
+ AwsCredentialsProviderChain.builder
+ .credentialsProviders(
+ EnvironmentVariableCredentialsProvider.create,
+ SystemPropertyCredentialsProvider.create,
+ ProfileCredentialsProvider.builder
+ .profileName(credentialProfileName)
+ .build)
+ .build
+}
diff --git
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
new file mode 100644
index 000000000..1ab2c0b06
--- /dev/null
+++
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/DynamoDBITTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.dynamodb
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import pekko.util.FutureConverters
+import org.scalatest.concurrent.{ Eventually, Futures, IntegrationPatience }
+import org.scalatest.concurrent.ScalaFutures._
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
+import software.amazon.awssdk.services.dynamodb.model._
+
+class DynamoDBITTest
+ extends AnyWordSpec
+ with Matchers
+ with Futures
+ with Eventually
+ with IntegrationPatience
+ with TestBase {
+
+ def withClient(testCode: DynamoDbAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = DynamoDbAsyncClient
+ .builder()
+ .credentialsProvider(credentialProviderChain)
+ .region(defaultRegion)
+ .httpClient(pekkoClient)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ "DynamoDB" should {
+ "create a table" in withClient { implicit client =>
+ val tableName = s"Movies-${randomIdentifier(5)}"
+ val attributes =
AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build()
+ val keySchema =
KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build()
+
+ val result = client
+ .createTable(
+ CreateTableRequest
+ .builder()
+ .tableName(tableName)
+ .attributeDefinitions(attributes)
+ .keySchema(keySchema)
+ .provisionedThroughput(
+ ProvisionedThroughput.builder
+ .readCapacityUnits(1L)
+ .writeCapacityUnits(1L)
+ .build())
+ .build())
+ .join
+
+ val desc = result.tableDescription()
+ desc.tableName() should be(tableName)
+
+ eventually {
+ val response =
+
FutureConverters.asScala(client.describeTable(DescribeTableRequest.builder().tableName(tableName).build()))
+ response.futureValue.table().tableStatus() should
be(TableStatus.ACTIVE)
+ }
+
FutureConverters.asScala(client.deleteTable(DeleteTableRequest.builder().tableName(tableName).build()))
+
+ }
+ }
+
+}
diff --git
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
new file mode 100644
index 000000000..348987b62
--- /dev/null
+++
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.kinesis
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.core.SdkBytes
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
+import software.amazon.awssdk.services.kinesis.model._
+
+import scala.util.Random
+
+class KinesisITTest extends AnyWordSpec with Matchers with TestBase {
+
+ def withClient(testCode: KinesisAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = KinesisAsyncClient
+ .builder()
+ .credentialsProvider(credentialProviderChain)
+ .region(defaultRegion)
+ .httpClient(pekkoClient)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ "Kinesis async client" should {
+
+ "use a data stream: create + put + get + delete" in withClient { implicit
client =>
+ val streamName = "aws-spi-test-" +
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+ val data = "123"
+
+ val createRequest = CreateStreamRequest
+ .builder()
+ .streamName(streamName)
+ .shardCount(1)
+ .build()
+
+ val _ = client.createStream(createRequest).join()
+ val describeStreamRequest =
DescribeStreamRequest.builder().streamName(streamName).build()
+
+ Thread.sleep(5000)
+ val streamArn = waitToBeCreated(client, describeStreamRequest, 15)
+
+ val putRequest = PutRecordRequest
+ .builder()
+ .streamName(streamName)
+ .partitionKey("partitionKey")
+ .data(SdkBytes.fromUtf8String(data))
+ .build()
+
+ val putResponse = client.putRecord(putRequest).join()
+
+ val getShardIterator = GetShardIteratorRequest
+ .builder()
+ .streamName(streamName)
+ .shardId(putResponse.shardId())
+ .shardIteratorType("TRIM_HORIZON")
+ .build()
+ val shardIterator = client.getShardIterator(getShardIterator).join()
+
+ val getRecordsRequest =
GetRecordsRequest.builder().shardIterator(shardIterator.shardIterator()).limit(1).build()
+
+ Thread.sleep(3000)
+
+ val res = client.getRecords(getRecordsRequest).join()
+
+ res.records().get(0).data().asUtf8String() shouldBe data
+
+
client.deleteStream(DeleteStreamRequest.builder().streamName(streamName).build()).join()
+
+ }
+ }
+
+ private def waitToBeCreated(client: KinesisAsyncClient, req:
DescribeStreamRequest, tries: Int): String =
+ if (tries == 0) "error"
+ else {
+ val r = client.describeStream(req).join()
+ if (r.streamDescription().streamStatus() == StreamStatus.ACTIVE) {
+ println(s"Current status: ${r.streamDescription().streamStatus()}")
+ r.streamDescription().streamARN()
+ } else {
+ Thread.sleep(1000)
+ waitToBeCreated(client, req, tries - 1)
+ }
+ }
+}
diff --git
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
new file mode 100644
index 000000000..b13f3a0d2
--- /dev/null
+++
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/s3/S3ITTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3
+
+import java.io.{ File, FileWriter }
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.core.async.{ AsyncRequestBody,
AsyncResponseTransformer }
+import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3Configuration }
+import software.amazon.awssdk.services.s3.model._
+
+import scala.util.Random
+
+class S3ITTest extends AnyWordSpec with Matchers with TestBase {
+
+ def withClient(checksumEnabled: Boolean = false)(testCode: S3AsyncClient =>
Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = S3AsyncClient
+ .builder()
+
.serviceConfiguration(S3Configuration.builder().checksumValidationEnabled(checksumEnabled).build())
+ .credentialsProvider(credentialProviderChain)
+ .region(defaultRegion)
+ .httpClient(pekkoClient)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ "S3 async client" should {
+ "upload and download a file to a bucket + cleanup" in
withClient(checksumEnabled = true) { implicit client =>
+ val bucketName = "aws-spi-test-" +
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+ createBucket(bucketName)
+ val randomFile = File.createTempFile("aws",
Random.alphanumeric.take(5).mkString)
+ val fileContent = Random.alphanumeric.take(1000).mkString
+ val fileWriter = new FileWriter(randomFile)
+ fileWriter.write(fileContent)
+ fileWriter.flush()
+
client.putObject(PutObjectRequest.builder().bucket(bucketName).key("my-file").build(),
randomFile.toPath).join
+
+ val result = client
+
.getObject(GetObjectRequest.builder().bucket(bucketName).key("my-file").build(),
+ AsyncResponseTransformer.toBytes[GetObjectResponse]())
+ .join
+ result.asUtf8String() should be(fileContent)
+
+
client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key("my-file").build()).join()
+
+
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join()
+ }
+
+ "multipart upload" in withClient() { implicit client =>
+ val bucketName = "aws-spi-test-" +
Random.alphanumeric.take(5).map(_.toLower).mkString
+ createBucket(bucketName)
+ val fileContent = (0 to 1000000).mkString
+ val createMultipartUploadResponse = client
+ .createMultipartUpload(
+
CreateMultipartUploadRequest.builder().bucket(bucketName).key("bar").contentType("text/plain").build())
+ .join()
+
+ val p1 = client
+ .uploadPart(
+ UploadPartRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .partNumber(1)
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .build(),
+ AsyncRequestBody.fromString(fileContent))
+ .join
+ val p2 = client
+ .uploadPart(
+ UploadPartRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .partNumber(2)
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .build(),
+ AsyncRequestBody.fromString(fileContent))
+ .join
+
+ client
+ .completeMultipartUpload(
+ CompleteMultipartUploadRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .multipartUpload(
+ CompletedMultipartUpload
+ .builder()
+
.parts(CompletedPart.builder().partNumber(1).eTag(p1.eTag()).build(),
+
CompletedPart.builder().partNumber(2).eTag(p2.eTag()).build())
+ .build())
+ .build())
+ .join
+
+ val result = client
+
.getObject(GetObjectRequest.builder().bucket(bucketName).key("bar").build(),
+ AsyncResponseTransformer.toBytes[GetObjectResponse]())
+ .join
+ result.asUtf8String() should be(fileContent + fileContent)
+
+
client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key("bar").build()).join()
+
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join()
+ }
+ }
+
+ def createBucket(name: String)(implicit client: S3AsyncClient): Unit =
+
client.createBucket(CreateBucketRequest.builder().bucket(name).build()).join
+
+}
diff --git
a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
new file mode 100644
index 000000000..4aa7f562e
--- /dev/null
+++
b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sqs
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ PekkoHttpAsyncHttpService, TestBase }
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.matchers.should.Matchers
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.model.{
+ CreateQueueRequest,
+ DeleteQueueRequest,
+ ReceiveMessageRequest,
+ SendMessageRequest
+}
+
+import scala.util.Random
+
+class SQSITTest extends AnyWordSpec with Matchers with TestBase {
+
+ def withClient(testCode: SqsAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = SqsAsyncClient
+ .builder()
+ .credentialsProvider(credentialProviderChain)
+ .httpClient(pekkoClient)
+ .region(defaultRegion)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ "Async SQS client" should {
+
+ "publish a message to a queue" in withClient { implicit client =>
+ val queueName = "aws-spi-test-" +
Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
+ val queueResponse =
client.createQueue(CreateQueueRequest.builder().queueName(queueName).build()).join()
+ val queueUrl = queueResponse.queueUrl()
+
client.sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody("123").build()).join()
+ val receivedMessage =
+
client.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(1).build()).join()
+ receivedMessage.messages().get(0).body() should be("123")
+
+ // deleteQueue
+
client.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build()).join()
+
+ val queueListing = client.listQueues().join()
+ queueListing.queueUrls() shouldBe java.util.Collections.EMPTY_LIST
+
+ }
+ }
+
+}
diff --git
a/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
b/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
new file mode 100644
index 000000000..6118ec68b
--- /dev/null
+++
b/aws-spi-pekko-http/src/main/resources/META-INF/services/software.amazon.awssdk.http.async.SdkAsyncHttpService
@@ -0,0 +1 @@
+org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService
diff --git
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
new file mode 100644
index 000000000..9d0d66eef
--- /dev/null
+++
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpAsyncHttpService.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import PekkoHttpClient.PekkoHttpClientBuilder
+import software.amazon.awssdk.http.async.SdkAsyncHttpService
+
+class PekkoHttpAsyncHttpService extends SdkAsyncHttpService {
+ override def createAsyncHttpClientFactory(): PekkoHttpClientBuilder =
PekkoHttpClient.builder()
+}
diff --git
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
new file mode 100644
index 000000000..9aacfd2b5
--- /dev/null
+++
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.concurrent.{ CompletableFuture, TimeUnit }
+
+import org.apache.pekko
+import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpHeader.ParsingResult
+import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok
+import pekko.http.scaladsl.model.MediaType.Compressible
+import pekko.http.scaladsl.model.RequestEntityAcceptance.Expected
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
+import pekko.http.scaladsl.settings.ConnectionPoolSettings
+import pekko.stream.scaladsl.Source
+import pekko.stream.{ Materializer, SystemMaterializer }
+import pekko.util.ByteString
+import pekko.util.OptionConverters
+import org.slf4j.LoggerFactory
+import software.amazon.awssdk.http.async._
+import software.amazon.awssdk.http.SdkHttpRequest
+import software.amazon.awssdk.utils.AttributeMap
+
+import scala.collection.immutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ Await, ExecutionContext }
+
+class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings:
ConnectionPoolSettings)(implicit
+ actorSystem: ActorSystem,
+ ec: ExecutionContext,
+ mat: Materializer) extends SdkAsyncHttpClient {
+ import PekkoHttpClient._
+
+ lazy val runner = new RequestRunner()
+
+ override def execute(request: AsyncExecuteRequest): CompletableFuture[Void]
= {
+ val pekkoHttpRequest = toPekkoRequest(request.request(),
request.requestContentPublisher())
+ runner.run(
+ () => Http().singleRequest(pekkoHttpRequest, settings =
connectionSettings),
+ request.responseHandler())
+ }
+
+ override def close(): Unit =
+ shutdownHandle()
+
+ override def clientName(): String = "pekko-http"
+}
+
+object PekkoHttpClient {
+
+ val logger = LoggerFactory.getLogger(this.getClass)
+
+ private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
+ contentPublisher: SdkHttpContentPublisher): HttpRequest = {
+ val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
+ val method = convertMethod(request.method().name())
+ HttpRequest(
+ method = method,
+ uri = Uri(request.getUri.toString),
+ headers = reqheaders,
+ entity =
+ entityForMethodAndContentType(method,
contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
+ protocol = HttpProtocols.`HTTP/1.1`)
+ }
+
+ private[awsspi] def entityForMethodAndContentType(method: HttpMethod,
+ contentType: ContentType,
+ contentPublisher: SdkHttpContentPublisher): RequestEntity =
+ method.requestEntityAcceptance match {
+ case Expected =>
+ OptionConverters.toScala(contentPublisher.contentLength()) match {
+ case Some(length) =>
+ HttpEntity(contentType, length,
Source.fromPublisher(contentPublisher).map(ByteString(_)))
+ case None => HttpEntity(contentType,
Source.fromPublisher(contentPublisher).map(ByteString(_)))
+ }
+ case _ => HttpEntity.Empty
+ }
+
+ private[awsspi] def convertMethod(method: String): HttpMethod =
+ HttpMethods
+ .getForKeyCaseInsensitive(method)
+ .getOrElse(throw new IllegalArgumentException(s"Method not configured:
$method"))
+
+ private[awsspi] def contentTypeHeaderToContentType(contentTypeHeader:
Option[HttpHeader]): ContentType =
+ contentTypeHeader
+ .map(_.value())
+ .map(v => contentTypeMap.getOrElse(v, tryCreateCustomContentType(v)))
+ // Its allowed to not have a content-type:
https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1
+ //
+ // Any HTTP/1.1 message containing an entity-body SHOULD include a
Content-Type header field defining the media type
+ // of that body. If and only if the media type is not given by a
Content-Type field, the recipient MAY attempt to
+ // guess the media type via inspection of its content and/or the name
extension(s) of the URI used to identify the
+ // resource. If the media type remains unknown, the recipient SHOULD
treat it as type "application/octet-stream".
+ //
+ .getOrElse(ContentTypes.NoContentType)
+
+ // This method converts the headers to Akka-http headers and drops
content-length and returns content-type separately
+ private[awsspi] def convertHeaders(
+ headers: java.util.Map[String, java.util.List[String]]):
(Option[HttpHeader], immutable.Seq[HttpHeader]) = {
+ val headersAsScala = {
+ val builder = collection.mutable.Map.newBuilder[String,
java.util.List[String]]
+ headers.forEach { case (k, v) => builder += k -> v }
+ builder.result()
+ }
+
+ headersAsScala.foldLeft((Option.empty[HttpHeader],
List.empty[HttpHeader])) { case ((ctHeader, hdrs), header) =>
+ val (headerName, headerValue) = header
+ if (headerValue.size() != 1) {
+ throw new IllegalArgumentException(
+ s"Found invalid header: key: $headerName, Value: ${val list =
List.newBuilder[String]
+ headerValue.forEach(v => list += v)
+ list.result()}.")
+ }
+ // skip content-length as it will be calculated by pekko-http itself and
must not be provided in the request headers
+ if (`Content-Length`.lowercaseName == headerName.toLowerCase) (ctHeader,
hdrs)
+ else {
+ HttpHeader.parse(headerName, headerValue.get(0)) match {
+ case ok: Ok =>
+ // return content-type separately as it will be used to calculate
ContentType, which is used on HttpEntity
+ if (ok.header.lowercaseName() == `Content-Type`.lowercaseName)
(Some(ok.header), hdrs)
+ else (ctHeader, hdrs :+ ok.header)
+ case error: ParsingResult.Error =>
+ throw new IllegalArgumentException(s"Found invalid header:
${error.errors}.")
+ }
+ }
+ }
+ }
+
+ private[awsspi] def tryCreateCustomContentType(contentTypeStr: String):
ContentType = {
+ logger.debug(s"Try to parse content type from $contentTypeStr")
+ val mainAndsubType = contentTypeStr.split('/')
+ if (mainAndsubType.length == 2)
+ ContentType(MediaType.customBinary(mainAndsubType(0), mainAndsubType(1),
Compressible))
+ else throw new RuntimeException(s"Could not parse custom content type
'$contentTypeStr'.")
+ }
+
+ def builder() = PekkoHttpClientBuilder()
+
+ case class PekkoHttpClientBuilder(private val actorSystem:
Option[ActorSystem] = None,
+ private val executionContext: Option[ExecutionContext] = None,
+ private val connectionPoolSettings: Option[ConnectionPoolSettings] =
None)
+ extends SdkAsyncHttpClient.Builder[PekkoHttpClientBuilder] {
+ def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
+ implicit val as = actorSystem.getOrElse(ActorSystem("aws-pekko-http"))
+ implicit val ec = executionContext.getOrElse(as.dispatcher)
+ val mat: Materializer = SystemMaterializer(as).materializer
+
+ val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
+ val shutdownhandleF = () => {
+ if (actorSystem.isEmpty) {
+ Await.result(Http().shutdownAllConnectionPools().flatMap(_ =>
as.terminate()),
+ Duration.apply(10, TimeUnit.SECONDS))
+ }
+ ()
+ }
+ new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat)
+ }
+ def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder =
copy(actorSystem = Some(actorSystem))
+ def withActorSystem(actorSystem: ClassicActorSystemProvider):
PekkoHttpClientBuilder =
+ copy(actorSystem = Some(actorSystem.classicSystem))
+ def withExecutionContext(executionContext: ExecutionContext):
PekkoHttpClientBuilder =
+ copy(executionContext = Some(executionContext))
+ def withConnectionPoolSettings(connectionPoolSettings:
ConnectionPoolSettings): PekkoHttpClientBuilder =
+ copy(connectionPoolSettings = Some(connectionPoolSettings))
+ }
+
+ lazy val xAmzJson = ContentType(MediaType.customBinary("application",
"x-amz-json-1.0", Compressible))
+ lazy val xAmzJson11 = ContentType(MediaType.customBinary("application",
"x-amz-json-1.1", Compressible))
+ lazy val xAmzCbor11 = ContentType(MediaType.customBinary("application",
"x-amz-cbor-1.1", Compressible))
+ lazy val formUrlEncoded =
+ ContentType(MediaType.applicationWithOpenCharset("x-www-form-urlencoded"),
HttpCharset.custom("utf-8"))
+ lazy val applicationXml = ContentType(MediaType.customBinary("application",
"xml", Compressible))
+
+ lazy val contentTypeMap: collection.immutable.Map[String, ContentType] =
collection.immutable.Map(
+ "application/x-amz-json-1.0" -> xAmzJson,
+ "application/x-amz-json-1.1" -> xAmzJson11,
+ "application/x-amz-cbor-1.1" -> xAmzCbor11, // used by Kinesis
+ "application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded,
+ "application/x-www-form-urlencoded" -> formUrlEncoded,
+ "application/xml" -> applicationXml)
+}
diff --git
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
new file mode 100644
index 000000000..9eaf3eac2
--- /dev/null
+++
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.concurrent.CompletableFuture
+import java.util.Collections
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.{ ContentTypes, HttpResponse }
+import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
+import pekko.stream.Materializer
+import pekko.stream.scaladsl.{ Keep, Sink }
+import pekko.util.FutureConverters
+import org.slf4j.LoggerFactory
+import software.amazon.awssdk.http.SdkHttpFullResponse
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler
+
+import scala.concurrent.{ ExecutionContext, Future }
+
+class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat:
Materializer) {
+
+ val logger = LoggerFactory.getLogger(this.getClass)
+
+ def run(runRequest: () => Future[HttpResponse], handler:
SdkAsyncHttpResponseHandler): CompletableFuture[Void] = {
+ val result = runRequest().flatMap { response =>
+ handler.onHeaders(toSdkHttpFullResponse(response))
+
+ val (complete, publisher) = response.entity.dataBytes
+ .filter(_.nonEmpty)
+ .map(_.asByteBuffer)
+ .alsoToMat(Sink.ignore)(Keep.right)
+ .toMat(Sink.asPublisher(fanout = false))(Keep.both)
+ .run()
+
+ handler.onStream(publisher)
+ complete
+ }
+
+ result.failed.foreach(handler.onError)
+ FutureConverters.asJava(result.map(_ => null: Void)).toCompletableFuture
+ }
+
+ private[awsspi] def toSdkHttpFullResponse(response: HttpResponse):
SdkHttpFullResponse =
+ SdkHttpFullResponse
+ .builder()
+ .headers(convertToSdkResponseHeaders(response))
+ .statusCode(response.status.intValue())
+ .statusText(response.status.reason)
+ .build
+
+ private[awsspi] def convertToSdkResponseHeaders(
+ response: HttpResponse): java.util.Map[String, java.util.List[String]] =
{
+ val responseHeaders = new java.util.HashMap[String, java.util.List[String]]
+
+ response.entity.contentType match {
+ case ContentTypes.NoContentType => ()
+ case contentType =>
responseHeaders.put(`Content-Type`.name,
Collections.singletonList(contentType.value))
+ }
+
+ response.entity.contentLengthOption.foreach(length =>
+ responseHeaders.put(`Content-Length`.name,
Collections.singletonList(length.toString)))
+
+ response.headers.groupBy(_.name()).foreach { case (k, v) =>
+ val values = new java.util.ArrayList[String]
+ v.foreach(header => values.add(header.value()))
+ responseHeaders.put(k, values)
+ }
+
+ responseHeaders
+ }
+}
diff --git
a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
new file mode 100644
index 000000000..528b285dd
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService;
+import org.junit.Rule;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+import org.testcontainers.containers.GenericContainer;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class S3Test extends JUnitSuite {
+
+ private static final String AB =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ private static SecureRandom rnd = new SecureRandom();
+
+ @Rule
+ @SuppressWarnings("rawtypes")
+ public GenericContainer<?> s3mock =
+ new GenericContainer("adobe/s3mock:2.17.0").withExposedPorts(9090);
+
+ @Test
+ public void testS3() throws Exception {
+ SdkAsyncHttpClient pekkoClient = null;
+ S3AsyncClient client = null;
+
+ try {
+ pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build();
+
+ client = getAsyncClient(pekkoClient);
+
+ createBucketAndAssert(client);
+ } finally {
+ pekkoClient.close();
+ client.close();
+ }
+ }
+
+ @Test
+ public void testS3WithExistingActorSystem() throws Exception {
+ ActorSystem system = ActorSystem.create();
+ SdkAsyncHttpClient pekkoClient = null;
+ S3AsyncClient client = null;
+
+ try {
+ pekkoClient =
+ new PekkoHttpAsyncHttpService()
+ .createAsyncHttpClientFactory()
+ .withActorSystem(system)
+ .build();
+
+ client = getAsyncClient(pekkoClient);
+
+ createBucketAndAssert(client);
+ } finally {
+ pekkoClient.close();
+ client.close();
+ system.terminate();
+ system.getWhenTerminated().toCompletableFuture().get(2,
TimeUnit.SECONDS);
+ }
+ }
+
+ private void createBucketAndAssert(S3AsyncClient client) throws IOException {
+
client.createBucket(CreateBucketRequest.builder().bucket("foo").build()).join();
+ File randomFile = File.createTempFile("aws1", randomString(5));
+ String fileContent = randomString(1000);
+ FileWriter fileWriter = new FileWriter(randomFile);
+ fileWriter.write(fileContent);
+ fileWriter.flush();
+ client
+ .putObject(
+ PutObjectRequest.builder()
+ .bucket("foo")
+ .key("my-file")
+ .contentType("text/plain")
+ .build(),
+ randomFile.toPath())
+ .join();
+
+ ResponseBytes<?> result =
+ client
+ .getObject(
+
GetObjectRequest.builder().bucket("foo").key("my-file").build(),
+ AsyncResponseTransformer.toBytes())
+ .join();
+
+ assertEquals(fileContent, result.asUtf8String());
+ }
+
+ private S3AsyncClient getAsyncClient(SdkAsyncHttpClient pekkoClient) throws
URISyntaxException {
+ return S3AsyncClient.builder()
+ .serviceConfiguration(
+ S3Configuration.builder()
+ .checksumValidationEnabled(false)
+ .pathStyleAccessEnabled(true)
+ .build())
+ .credentialsProvider(AnonymousCredentialsProvider.create())
+ .endpointOverride(new URI("http://localhost:" +
s3mock.getMappedPort(9090)))
+ .region(Region.of("s3"))
+ .httpClient(pekkoClient)
+ .build();
+ }
+
+ private String randomString(int len) {
+ StringBuilder sb = new StringBuilder(len);
+ for (int i = 0; i < len; i++)
sb.append(AB.charAt(rnd.nextInt(AB.length())));
+ return sb.toString();
+ }
+}
diff --git a/aws-spi-pekko-http/src/test/resources/application.conf
b/aws-spi-pekko-http/src/test/resources/application.conf
new file mode 100644
index 000000000..083becee2
--- /dev/null
+++ b/aws-spi-pekko-http/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.http.client.parsing.max-content-length = 150m
diff --git a/aws-spi-pekko-http/src/test/resources/logback-test.xml
b/aws-spi-pekko-http/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..5d1d29de5
--- /dev/null
+++ b/aws-spi-pekko-http/src/test/resources/logback-test.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} -
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- Strictly speaking, the level attribute is not necessary since -->
+ <!-- the level of the root level is set to DEBUG by default. -->
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+ <logger name="io.netty" level="trace" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="com.typesafe" level="error" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="org.apache.pekko.http" level="error" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <!-- Log of http content // with
pekko.http.client.log-unencrypted-network-bytes = 10000 -->
+ <logger name="org.apache.pekko.stream" level="error" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="org.testcontainers" level="info" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="com.github.dockerjava" level="info" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="com.github.pjfanning" level="debug" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+ <logger name="software.amazon" level="error" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+</configuration>
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
new file mode 100644
index 000000000..3e558f28e
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/BaseAwsClientTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.net.URI
+
+import org.apache.pekko
+import
pekko.stream.connectors.awsspi.testcontainers.LocalStackReadyLogWaitStrategy
+import com.dimafeng.testcontainers.{ ForAllTestContainer, GenericContainer }
+import org.scalatest.concurrent.{ Eventually, Futures, IntegrationPatience }
+import org.scalatest.BeforeAndAfter
+import software.amazon.awssdk.core.SdkClient
+import software.amazon.awssdk.regions.Region
+
+import scala.util.Random
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+trait BaseAwsClientTest[C <: SdkClient]
+ extends AnyWordSpec
+ with Matchers
+ with Futures
+ with Eventually
+ with BeforeAndAfter
+ with IntegrationPatience
+ with ForAllTestContainer {
+
+ lazy val defaultRegion: Region = Region.EU_WEST_1
+
+ def exposedServicePort: Int
+ def container: GenericContainer
+
+ def endpoint = new
URI(s"http://localhost:${container.mappedPort(exposedServicePort)}")
+ def randomIdentifier(length: Int): String =
Random.alphanumeric.take(length).mkString
+}
+
+trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C]
{
+ def service: String
+
+ lazy val exposedServicePort: Int = 4566
+
+ override lazy val container: GenericContainer =
+ new GenericContainer(
+ dockerImage = "localstack/localstack",
+ exposedPorts = Seq(exposedServicePort),
+ env = Map("SERVICES" -> service),
+ waitStrategy = Some(LocalStackReadyLogWaitStrategy))
+}
+
+trait ElasticMQSQSBaseAwsClientTest[C <: SdkClient] extends
BaseAwsClientTest[C] {
+ def service: String
+
+ lazy val exposedServicePort: Int = 9324
+
+ override lazy val container: GenericContainer =
+ new GenericContainer(
+ dockerImage = "softwaremill/elasticmq-native",
+ exposedPorts = Seq(exposedServicePort))
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
new file mode 100644
index 000000000..6046aa1b6
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.util.Collections
+
+import org.apache.pekko
+import pekko.http.scaladsl.model.headers.`Content-Type`
+import pekko.http.scaladsl.model.MediaTypes
+import org.scalatest.OptionValues
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
+
+ "PekkoHttpClient" should {
+
+ "parse custom content type" in {
+ val contentTypeStr = "application/xml"
+ val contentType =
PekkoHttpClient.tryCreateCustomContentType(contentTypeStr)
+ contentType.mediaType should be(MediaTypes.`application/xml`)
+ }
+
+ "remove 'ContentType' return 'ContentLength' separate from sdk headers" in
{
+ val headers = new java.util.HashMap[String, java.util.List[String]]
+ headers.put("Content-Type", Collections.singletonList("application/xml"))
+ headers.put("Content-Length", Collections.singletonList("123"))
+ headers.put("Accept", Collections.singletonList("*/*"))
+
+ val (contentTypeHeader, reqHeaders) =
PekkoHttpClient.convertHeaders(headers)
+
+ contentTypeHeader.value.lowercaseName() shouldBe
`Content-Type`.lowercaseName
+ reqHeaders should have size 1
+ }
+ }
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
new file mode 100644
index 000000000..b899f3efd
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunnerSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.http.scaladsl.model.headers.`User-Agent`
+import pekko.http.scaladsl.model.{ HttpEntity, HttpResponse }
+import org.reactivestreams.{ Publisher, Subscriber, Subscription }
+import org.scalatest.OptionValues
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+import software.amazon.awssdk.http.SdkHttpResponse
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler
+
+import scala.concurrent.Future
+
+class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues {
+ "Check headers are present from response" in {
+ implicit val system = ActorSystem("test")
+ implicit val ec = system.dispatcher
+ val response = HttpResponse(entity = HttpEntity("Ok"), headers =
`User-Agent`("Mozilla") :: Nil)
+ val runner = new RequestRunner()
+ val handler = new MyHeaderHandler()
+ val resp = runner.run(() => Future.successful(response), handler)
+ resp.join()
+
+ handler.responseHeaders.headers().get("User-Agent").get(0) shouldBe
"Mozilla"
+ handler.responseHeaders.headers().get("Content-Type").get(0) shouldBe
"text/plain; charset=UTF-8"
+ handler.responseHeaders.headers().get("Content-Length").get(0) shouldBe "2"
+ }
+
+ class MyHeaderHandler() extends SdkAsyncHttpResponseHandler {
+ private val headers = new AtomicReference[SdkHttpResponse](null)
+ def responseHeaders = headers.get()
+ override def onHeaders(headers: SdkHttpResponse): Unit =
this.headers.set(headers)
+ override def onStream(stream: Publisher[ByteBuffer]): Unit =
stream.subscribe(new Subscriber[ByteBuffer] {
+ override def onSubscribe(s: Subscription): Unit = s.request(1000)
+ override def onNext(t: ByteBuffer): Unit = ()
+ override def onError(t: Throwable): Unit = ()
+ override def onComplete(): Unit = ()
+ })
+ override def onError(error: Throwable): Unit = ()
+ }
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
new file mode 100644
index 000000000..42c758286
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/dynamodb/TestDynamoDB.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.dynamodb
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ LocalstackBaseAwsClientTest,
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
+import software.amazon.awssdk.services.dynamodb.model._
+
+class TestDynamoDB extends LocalstackBaseAwsClientTest[DynamoDbAsyncClient] {
+ "DynamoDB" should {
+ "create a table" in withClient { implicit client =>
+ val attributes =
AttributeDefinition.builder.attributeName("film_id").attributeType(ScalarAttributeType.S).build()
+ val keySchema =
KeySchemaElement.builder.attributeName("film_id").keyType(KeyType.HASH).build()
+
+ val emptyTableResult = client.listTables().join()
+ emptyTableResult.tableNames() should have size 0
+
+ val result = client
+ .createTable(
+ CreateTableRequest
+ .builder()
+ .tableName("Movies")
+ .attributeDefinitions(attributes)
+ .keySchema(keySchema)
+ .provisionedThroughput(
+
ProvisionedThroughput.builder.readCapacityUnits(1000L).writeCapacityUnits(1000L).build())
+ .build())
+ .join
+
+ val desc = result.tableDescription()
+ desc.tableName() should be("Movies")
+
+ val tableResult = client.listTables().join()
+ tableResult.tableNames() should have size 1
+ }
+
+ }
+
+ def withClient(testCode: DynamoDbAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = DynamoDbAsyncClient
+ .builder()
+ .endpointOverride(endpoint)
+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
"x")))
+ .httpClient(pekkoClient)
+ .region(defaultRegion)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ override val service: String = "dynamodb"
+
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
new file mode 100644
index 000000000..94771ab47
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/s3/TestS3.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.s3
+
+import java.io.{ File, FileWriter }
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ BaseAwsClientTest,
PekkoHttpAsyncHttpService }
+import pekko.stream.connectors.awsspi.testcontainers.TimeoutWaitStrategy
+import com.dimafeng.testcontainers.GenericContainer
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
+import software.amazon.awssdk.core.async.{ AsyncRequestBody,
AsyncResponseTransformer }
+import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3Configuration }
+import software.amazon.awssdk.services.s3.model._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
+
+ "Async S3 client" should {
+ "create bucket" in withClient { implicit client =>
+ val bucketName = createBucket()
+ val buckets = client.listBuckets().join
+ buckets.buckets() should have size 1
+ buckets.buckets().get(0).name() should be(bucketName)
+ }
+
+ "upload and download a file to a bucket" in withClient { implicit client =>
+ val bucketName = createBucket()
+ val fileContent = 0 to 1000 mkString
+
+ client
+
.putObject(PutObjectRequest.builder().bucket(bucketName).key("my-file").contentType("text/plain").build(),
+ AsyncRequestBody.fromString(fileContent))
+ .join
+
+ val result = client
+
.getObject(GetObjectRequest.builder().bucket(bucketName).key("my-file").build(),
+ AsyncResponseTransformer.toBytes[GetObjectResponse]())
+ .join
+
+ result.asUtf8String() should be(fileContent)
+ result.response().contentType() should be("text/plain")
+ result.response().contentLength() shouldEqual
fileContent.getBytes().length
+ }
+
+ "multipart upload" in withClient { implicit client =>
+ val bucketName = createBucket()
+ val randomFile = File.createTempFile("aws1",
Random.alphanumeric.take(5).mkString)
+ val fileContent = (0 to 1000000).mkString
+ val fileWriter = new FileWriter(randomFile)
+ fileWriter.write(fileContent)
+ fileWriter.flush()
+ val createMultipartUploadResponse = client
+ .createMultipartUpload(
+
CreateMultipartUploadRequest.builder().bucket(bucketName).key("bar").contentType("text/plain").build())
+ .join()
+
+ val p1 = client
+ .uploadPart(UploadPartRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .partNumber(1)
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .build(),
+ randomFile.toPath)
+ .join
+ val p2 = client
+ .uploadPart(UploadPartRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .partNumber(2)
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .build(),
+ randomFile.toPath)
+ .join
+
+ client
+ .completeMultipartUpload(
+ CompleteMultipartUploadRequest
+ .builder()
+ .bucket(bucketName)
+ .key("bar")
+ .multipartUpload(
+ CompletedMultipartUpload
+ .builder()
+
.parts(CompletedPart.builder().partNumber(1).eTag(p1.eTag()).build(),
+
CompletedPart.builder().partNumber(2).eTag(p2.eTag()).build())
+ .build())
+ .uploadId(createMultipartUploadResponse.uploadId())
+ .build())
+ .join
+
+ val result = client
+
.getObject(GetObjectRequest.builder().bucket(bucketName).key("bar").build(),
+ AsyncResponseTransformer.toBytes[GetObjectResponse]())
+ .join
+ result.asUtf8String() should be(fileContent + fileContent)
+ }
+
+ "upload and head request gzip file with 0 content-length" in withClient {
implicit client =>
+ val bucketName = createBucket()
+ val fileName = "my-empty-file"
+ val contentLength = 0L
+ val contentType = "application/json"
+ val contentEncoding = "gzip"
+
+ client
+ .putObject(
+ PutObjectRequest
+ .builder()
+ .bucket(bucketName)
+ .key(fileName)
+ .contentType(contentType)
+ .contentLength(contentLength)
+ .contentEncoding(contentEncoding)
+ .build(),
+ AsyncRequestBody.fromString(""))
+ .join
+
+ val result = client
+
.headObject(HeadObjectRequest.builder().bucket(bucketName).key(fileName).build())
+ .join
+
+ result.contentLength() should be(contentLength)
+ result.contentType() should be(contentType)
+ result.contentEncoding() should be(contentEncoding)
+ }
+
+ }
+
+ def createBucket()(implicit client: S3AsyncClient): String = {
+ val bucketName = Random.alphanumeric.take(7).map(_.toLower).mkString
+
client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()).join
+ bucketName
+ }
+
+ private def withClient(testCode: S3AsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = S3AsyncClient
+ .builder()
+ .serviceConfiguration(
+ S3Configuration
+ .builder()
+ .checksumValidationEnabled(false)
+ .pathStyleAccessEnabled(true)
+ .build())
+ .credentialsProvider(AnonymousCredentialsProvider.create)
+ .endpointOverride(endpoint)
+ .httpClient(pekkoClient)
+ .region(defaultRegion)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ override def exposedServicePort: Int = 9090
+
+ override lazy val container: GenericContainer = new GenericContainer(
+ dockerImage = "adobe/s3mock:2.17.0",
+ exposedPorts = Seq(exposedServicePort),
+ waitStrategy = Some(TimeoutWaitStrategy(10 seconds)))
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
new file mode 100644
index 000000000..d73fcb849
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sns/TestSNS.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sns
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ LocalstackBaseAwsClientTest,
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
+import software.amazon.awssdk.services.sns.SnsAsyncClient
+import software.amazon.awssdk.services.sns.model.{ CreateTopicRequest,
PublishRequest }
+
+class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {
+
+ "Async SNS client" should {
+ "publish a message to a topic" in withClient { implicit client =>
+ val arn =
client.createTopic(CreateTopicRequest.builder().name("topic-example").build()).join().topicArn()
+ val result = client.publish(PublishRequest.builder().message("a
message").topicArn(arn).build()).join()
+
+ result.messageId() should not be null
+ }
+ }
+
+ def withClient(testCode: SnsAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = SnsAsyncClient
+ .builder()
+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
"x")))
+ .httpClient(pekkoClient)
+ .region(defaultRegion)
+ .endpointOverride(endpoint)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ override def service: String = "sns"
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
new file mode 100644
index 000000000..ee2b92123
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/TestSQS.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.sqs
+
+import org.apache.pekko
+import pekko.stream.connectors.awsspi.{ ElasticMQSQSBaseAwsClientTest,
PekkoHttpAsyncHttpService }
+import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
+import software.amazon.awssdk.services.sqs.SqsAsyncClient
+import software.amazon.awssdk.services.sqs.model._
+
+class TestSQS extends ElasticMQSQSBaseAwsClientTest[SqsAsyncClient] {
+ "Async SQS client" should {
+
+ "publish a message to a queue" in withClient { implicit client =>
+
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
+ client
+
.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build())
+ .join()
+ val receivedMessage = client
+
.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build())
+ .join()
+ receivedMessage.messages().get(0).body() should be("123")
+ }
+
+ "delete a message" in withClient { implicit client =>
+
client.createQueue(CreateQueueRequest.builder().queueName("foo").build()).join()
+ client
+
.sendMessage(SendMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").messageBody("123").build())
+ .join()
+
+ val receivedMessages = client
+
.receiveMessage(ReceiveMessageRequest.builder().queueUrl(s"$endpoint/queue/foo").maxNumberOfMessages(1).build())
+ .join
+
+ client
+ .deleteMessage(
+ DeleteMessageRequest
+ .builder()
+ .queueUrl(s"$endpoint/queue/foo")
+ .receiptHandle(receivedMessages.messages().get(0).receiptHandle())
+ .build())
+ .join()
+
+ val receivedMessage = client
+ .receiveMessage(
+ ReceiveMessageRequest
+ .builder()
+ .queueUrl(s"$endpoint/queue/foo")
+ .maxNumberOfMessages(1)
+ .waitTimeSeconds(1)
+ .build())
+ .join()
+ receivedMessage.messages() shouldBe java.util.Collections.EMPTY_LIST
+ }
+
+ }
+
+ def withClient(testCode: SqsAsyncClient => Any): Any = {
+
+ val pekkoClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
+
+ val client = SqsAsyncClient
+ .builder()
+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x",
"x")))
+ .httpClient(pekkoClient)
+ .region(defaultRegion)
+ .endpointOverride(endpoint)
+ .build()
+
+ try
+ testCode(client)
+ finally { // clean up
+ pekkoClient.close()
+ client.close()
+ }
+ }
+
+ override def service: String = "sqs"
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
new file mode 100644
index 000000000..07ed0490c
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/LocalStackReadyLogWaitStrategy.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.testcontainers
+
+import java.util.concurrent.{ TimeUnit, TimeoutException }
+import java.util.function.Predicate
+
+import org.testcontainers.DockerClientFactory
+import org.testcontainers.containers.ContainerLaunchException
+import org.testcontainers.containers.output.{ OutputFrame, WaitingConsumer }
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy
+import org.testcontainers.utility.LogUtils
+
+/**
+ * This strategy is based on the container log "Ready." from Localstack. Once
it's printed out, the container is good
+ * to go.
+ */
+object LocalStackReadyLogWaitStrategy extends AbstractWaitStrategy {
+ override def waitUntilReady(): Unit = {
+ val waitingConsumer = new WaitingConsumer
+ LogUtils.followOutput(DockerClientFactory.instance.client,
waitStrategyTarget.getContainerId, waitingConsumer)
+
+ val waitPredicate: Predicate[OutputFrame] = (outputFrame: OutputFrame) =>
+ outputFrame.getUtf8String.contains("Ready.")
+
+ try
+ waitingConsumer.waitUntil(waitPredicate, startupTimeout.getSeconds,
TimeUnit.SECONDS, 1)
+ catch {
+ case _: TimeoutException =>
+ throw new ContainerLaunchException("Timed out waiting for localstack")
+ }
+ }
+}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
new file mode 100644
index 000000000..62f206043
--- /dev/null
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/testcontainers/TimeoutWaitStrategy.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.awsspi.testcontainers
+
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy
+
+import scala.concurrent.duration.FiniteDuration
+
+object TimeoutWaitStrategy {
+ def apply(timeout: FiniteDuration): TimeoutWaitStrategy = new
TimeoutWaitStrategy(timeout)
+}
+
+class TimeoutWaitStrategy(timeout: FiniteDuration) extends
AbstractWaitStrategy {
+ override def waitUntilReady(): Unit = Thread.sleep(timeout.toMillis)
+}
diff --git a/awslambda/src/test/java/docs/javadsl/Examples.java
b/awslambda/src/test/java/docs/javadsl/Examples.java
index 5b0427b7b..5711219d8 100644
--- a/awslambda/src/test/java/docs/javadsl/Examples.java
+++ b/awslambda/src/test/java/docs/javadsl/Examples.java
@@ -22,7 +22,7 @@ import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
// #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
diff --git a/awslambda/src/test/scala/docs/scaladsl/Examples.scala
b/awslambda/src/test/scala/docs/scaladsl/Examples.scala
index 5ae45c9eb..26dfff603 100644
--- a/awslambda/src/test/scala/docs/scaladsl/Examples.scala
+++ b/awslambda/src/test/scala/docs/scaladsl/Examples.scala
@@ -28,7 +28,7 @@ object Examples {
def initClient(): Unit = {
// #init-client
- import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+ import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
diff --git a/build.sbt b/build.sbt
index 8986071b8..956965398 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,6 +18,7 @@ ThisBuild / reproducibleBuildsCheckResolver :=
Resolver.ApacheMavenStagingRepo
lazy val userProjects: Seq[ProjectReference] = List[ProjectReference](
amqp,
avroparquet,
+ awsSpiPekkoHttp,
awslambda,
azureStorageQueue,
cassandra,
@@ -128,7 +129,13 @@ lazy val amqp = pekkoConnectorProject("amqp", "amqp",
Dependencies.Amqp)
lazy val avroparquet =
pekkoConnectorProject("avroparquet", "avroparquet", Dependencies.AvroParquet)
+lazy val awsSpiPekkoHttp =
+ pekkoConnectorProject("aws-spi-pekko-http", "aws.api.pekko.http",
Dependencies.AwsSpiPekkoHttp)
+ .configs(IntegrationTest)
+ .settings(Defaults.itSettings)
+
lazy val awslambda = pekkoConnectorProject("awslambda", "aws.lambda",
Dependencies.AwsLambda)
+ .dependsOn(awsSpiPekkoHttp)
lazy val azureStorageQueue = pekkoConnectorProject(
"azure-storage-queue",
@@ -151,6 +158,7 @@ lazy val csvBench = internalProject("csv-bench")
.enablePlugins(JmhPlugin)
lazy val dynamodb = pekkoConnectorProject("dynamodb", "aws.dynamodb",
Dependencies.DynamoDB)
+ .dependsOn(awsSpiPekkoHttp)
lazy val elasticsearch = pekkoConnectorProject(
"elasticsearch",
@@ -278,6 +286,7 @@ lazy val jms = pekkoConnectorProject("jms", "jms",
Dependencies.Jms)
lazy val jsonStreaming = pekkoConnectorProject("json-streaming",
"json.streaming", Dependencies.JsonStreaming)
lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis",
Dependencies.Kinesis)
+ .dependsOn(awsSpiPekkoHttp)
lazy val kudu = pekkoConnectorProject("kudu", "kudu", Dependencies.Kudu)
@@ -307,6 +316,7 @@ lazy val reference = internalProject("reference",
Dependencies.Reference)
lazy val s3 = pekkoConnectorProject("s3", "aws.s3", Dependencies.S3,
MetaInfLicenseNoticeCopy.s3Settings)
+ .dependsOn(awsSpiPekkoHttp)
lazy val pravega = pekkoConnectorProject(
"pravega",
@@ -323,16 +333,19 @@ lazy val simpleCodecs =
pekkoConnectorProject("simple-codecs", "simplecodecs")
lazy val slick = pekkoConnectorProject("slick", "slick", Dependencies.Slick)
-lazy val eventbridge =
- pekkoConnectorProject("aws-event-bridge", "aws.eventbridge",
Dependencies.Eventbridge)
+lazy val eventbridge = pekkoConnectorProject("aws-event-bridge",
"aws.eventbridge",
+ Dependencies.Eventbridge)
+ .dependsOn(awsSpiPekkoHttp)
lazy val sns = pekkoConnectorProject("sns", "aws.sns", Dependencies.Sns)
+ .dependsOn(awsSpiPekkoHttp)
// Solrj has some deprecated methods
lazy val solr = pekkoConnectorProject("solr", "solr", Dependencies.Solr,
fatalWarnings := false)
lazy val sqs = pekkoConnectorProject("sqs", "aws.sqs", Dependencies.Sqs)
+ .dependsOn(awsSpiPekkoHttp)
lazy val sse = pekkoConnectorProject("sse", "sse", Dependencies.Sse)
@@ -466,7 +479,7 @@ def pekkoConnectorProject(projectId: String,
licenses := List(License.Apache2),
AutomaticModuleName.settings(s"pekko.stream.connectors.$moduleName"),
mimaPreviousArtifacts := {
- if (moduleName == "slick" || moduleName == "couchbase3") {
+ if (moduleName == "slick" || moduleName == "couchbase3" || moduleName
== "aws.api.pekko.http") {
Set.empty
} else {
Set(organization.value %% name.value % mimaCompareVersion)
diff --git a/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
b/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
index 261e31d73..138a376cc 100644
--- a/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
+++ b/dynamodb/src/test/java/docs/javadsl/ExampleTest.java
@@ -29,7 +29,7 @@ import org.apache.pekko.stream.javadsl.SourceWithContext;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.*;
// #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import scala.util.Try;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
diff --git a/dynamodb/src/test/java/docs/javadsl/RetryTest.java
b/dynamodb/src/test/java/docs/javadsl/RetryTest.java
index 0a325c729..a1ee03076 100644
--- a/dynamodb/src/test/java/docs/javadsl/RetryTest.java
+++ b/dynamodb/src/test/java/docs/javadsl/RetryTest.java
@@ -14,9 +14,9 @@
package docs.javadsl;
import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.testkit.javadsl.TestKit;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import org.junit.Rule;
import org.junit.Test;
// #clientRetryConfig
diff --git a/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
b/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
index 475fc019c..ab7b2632a 100644
--- a/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
+++ b/dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
@@ -32,7 +32,7 @@ import pekko.testkit.TestKit
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterAll
//#init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
diff --git a/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
b/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
index b0dfe3485..8766a8ece 100644
--- a/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
+++ b/dynamodb/src/test/scala/docs/scaladsl/RetrySpec.scala
@@ -15,9 +15,9 @@ package docs.scaladsl
import org.apache.pekko
import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.testkit.TestKit
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.scalatest.BeforeAndAfterAll
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
// #awsRetryConfiguration
diff --git
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
index dd8241d18..26cf9b647 100644
---
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
+++
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/ItemSpec.scala
@@ -16,11 +16,11 @@ package org.apache.pekko.stream.connectors.dynamodb
import java.net.URI
import org.apache.pekko
import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import pekko.stream.connectors.dynamodb.scaladsl._
import pekko.stream.scaladsl.Sink
import pekko.testkit.TestKit
import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpecLike
diff --git
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
index b2197d6e9..372964730 100644
---
a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
+++
b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TableSpec.scala
@@ -17,10 +17,10 @@ import java.net.URI
import org.apache.pekko
import pekko.actor.ActorSystem
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import pekko.stream.connectors.dynamodb.scaladsl.DynamoDb
import pekko.testkit.TestKit
import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.scalatest.BeforeAndAfterAll
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
diff --git a/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
b/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
index 322b2318d..31254890c 100644
--- a/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KinesisFirehoseSnippets.java
@@ -21,7 +21,7 @@ import
org.apache.pekko.stream.connectors.kinesisfirehose.javadsl.KinesisFirehos
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
// #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
// #init-client
import
software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
diff --git a/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
b/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
index f24d04292..d821b8b8b 100644
--- a/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
@@ -15,6 +15,7 @@ package docs.javadsl;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.connectors.kinesis.KinesisFlowSettings;
import org.apache.pekko.stream.connectors.kinesis.ShardIterators;
import org.apache.pekko.stream.connectors.kinesis.ShardSettings;
@@ -26,7 +27,6 @@ import org.apache.pekko.stream.javadsl.FlowWithContext;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
// #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
// #init-client
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
diff --git a/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
b/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
index acf0851d1..df9568df6 100644
--- a/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
+++ b/kinesis/src/test/scala/docs/scaladsl/KinesisFirehoseSnippets.scala
@@ -24,7 +24,7 @@ import software.amazon.awssdk.services.firehose.model.{
PutRecordBatchResponseEn
object KinesisFirehoseSnippets {
// #init-client
- import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+ import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient
implicit val system: ActorSystem = ActorSystem()
diff --git a/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
b/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
index 33c037586..696342c44 100644
--- a/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
+++ b/kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
@@ -29,7 +29,7 @@ import scala.concurrent.duration._
object KinesisSnippets {
// #init-client
- import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+ import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
implicit val system: ActorSystem = ActorSystem()
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index b33c35ada..30b7bab3a 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -57,7 +57,7 @@ object Dependencies {
val scalaTestScalaCheckArtifact =
s"scalacheck-${scalaTestPlusScalaCheckVersion(scalaCheckVersion)}"
val scalaTestScalaCheckVersion = s"$ScalaTestVersion.0"
- val scalaTestMockitoVersion = "3.2.18.0" //
https://github.com/scalatest/scalatest/issues/2311
+ val scalaTestMockitoVersion = scalaTestScalaCheckVersion
val CouchbaseVersion = "2.7.23"
val Couchbase3Version = "3.6.0"
@@ -127,11 +127,28 @@ object Dependencies {
libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "5.21.0") ++ Mockito)
+ val AwsSpiPekkoHttp = Seq(
+ libraryDependencies ++= Seq(
+ "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
+ "software.amazon.awssdk" % "http-client-spi" % AwsSdk2Version,
+ ("software.amazon.awssdk" % "dynamodb" % AwsSdk2Version %
"it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ ("software.amazon.awssdk" % "kinesis" % AwsSdk2Version %
"it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ ("software.amazon.awssdk" % "sns" % AwsSdk2Version %
"it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ ("software.amazon.awssdk" % "sqs" % AwsSdk2Version %
"it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ ("software.amazon.awssdk" % "s3" % AwsSdk2Version %
"it,test").excludeAll(
+ ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
+ "com.dimafeng" %% "testcontainers-scala" %
TestContainersScalaTestVersion % Test,
+ "org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test",
+ "org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion %
"it,test",
+ "ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test"))
+
val AwsLambda = Seq(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(
- ExclusionRule(organization = "org.apache.pekko")),
("software.amazon.awssdk" % "lambda" % AwsSdk2Version).excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
ExclusionRule("software.amazon.awssdk", "netty-nio-client"))) ++
Mockito)
@@ -177,8 +194,6 @@ object Dependencies {
val DynamoDB = Seq(
libraryDependencies ++= Seq(
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(
- ExclusionRule(organization = "org.apache.pekko")),
("software.amazon.awssdk" % "dynamodb" % AwsSdk2Version).excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -363,8 +378,6 @@ object Dependencies {
val Kinesis = Seq(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(ExclusionRule(
- organization = "org.apache.pekko"))) ++ Seq(
"software.amazon.awssdk" % "kinesis" % AwsSdk2Version,
"software.amazon.awssdk" % "firehose" % AwsSdk2Version,
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.6.0").map(
@@ -452,8 +465,6 @@ object Dependencies {
val Eventbridge = Seq(
libraryDependencies ++= Seq(
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(
- ExclusionRule(organization = "org.apache.pekko")),
("software.amazon.awssdk" % "eventbridge" % AwsSdk2Version).excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -461,8 +472,6 @@ object Dependencies {
val Sns = Seq(
libraryDependencies ++= Seq(
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(
- ExclusionRule(organization = "org.apache.pekko")),
("software.amazon.awssdk" % "sns" % AwsSdk2Version).excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
@@ -483,8 +492,6 @@ object Dependencies {
val Sqs = Seq(
libraryDependencies ++= Seq(
- ("com.github.pjfanning" %% "aws-spi-pekko-http" %
AwsSpiPekkoHttpVersion).excludeAll(
- ExclusionRule(organization = "org.apache.pekko")),
("software.amazon.awssdk" % "sqs" % AwsSdk2Version).excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
ExclusionRule("software.amazon.awssdk", "netty-nio-client")),
diff --git a/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
b/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
index b05d9fe46..bcf32de28 100644
--- a/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
+++ b/sns/src/test/java/docs/javadsl/SnsPublisherTest.java
@@ -16,6 +16,7 @@ package docs.javadsl;
import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.connectors.sns.javadsl.SnsPublisher;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Sink;
@@ -24,7 +25,6 @@ import org.apache.pekko.testkit.javadsl.TestKit;
// #init-client
import java.net.URI;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
diff --git
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
index e478c016f..0cf134696 100644
---
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
+++
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/IntegrationTestContext.scala
@@ -63,7 +63,7 @@ trait IntegrationTestContext extends BeforeAndAfterAll with
ScalaFutures {
// #init-client
import java.net.URI
- import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
+ import pekko.stream.connectors.awsspi.PekkoHttpClient
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
diff --git a/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
b/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
index 5bdfee652..4b42416fb 100644
--- a/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
+++ b/sqs/src/test/java/docs/javadsl/SqsSourceTest.java
@@ -14,6 +14,7 @@
package docs.javadsl;
import org.apache.pekko.Done;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.connectors.sqs.MessageAttributeName;
import org.apache.pekko.stream.connectors.sqs.MessageSystemAttributeName;
import org.apache.pekko.stream.connectors.sqs.SqsPublishBatchSettings;
@@ -23,7 +24,6 @@ import
org.apache.pekko.stream.connectors.sqs.javadsl.SqsPublishSink;
import org.apache.pekko.stream.connectors.sqs.javadsl.SqsSource;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import org.junit.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
diff --git
a/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
b/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
index 709fa5b2e..a2540de70 100644
---
a/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
+++
b/sqs/src/test/java/org/apache/pekko/stream/connectors/sqs/javadsl/BaseSqsTest.java
@@ -15,10 +15,10 @@ package org.apache.pekko.stream.connectors.sqs.javadsl;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.testkit.javadsl.TestKit;
// #init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
diff --git a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
index 6b2963dfe..699eb6d86 100644
--- a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
+++ b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala
@@ -19,12 +19,12 @@ import java.util.concurrent.TimeUnit
import org.apache.pekko
import pekko.Done
import pekko.stream.KillSwitches
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import pekko.stream.connectors.sqs._
import pekko.stream.connectors.sqs.scaladsl.{ DefaultTestContext, SqsSource }
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.{ Keep, Sink }
import pekko.util.ccompat.JavaConverters._
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
diff --git
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
index cb5446494..661302579 100644
---
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
+++
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/DefaultTestContext.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit
import org.apache.pekko
import pekko.actor.{ ActorSystem, Terminated }
import pekko.http.scaladsl.Http
+import pekko.stream.connectors.awsspi.PekkoHttpClient
import pekko.stream.connectors.sqs.SqsSourceSettings
import pekko.util.ccompat.JavaConverters._
import org.scalatest.concurrent.ScalaFutures
@@ -27,7 +28,6 @@ import org.scalatest.{ BeforeAndAfterAll, Suite, Tag }
import scala.concurrent.ExecutionContext
//#init-client
-import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials,
StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]