This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 8c5d65051cfeb3df4a0eb87039c33131b6b21ccc Author: PJ Fanning <[email protected]> AuthorDate: Fri Aug 18 20:06:49 2023 +0100 add scala3 support for s3 (#167) * add futiles license * some more compile issues * more compile issues * Update S3.scala * try .toInt * remove ASF header from futiles code * Final touches to get S3 working for Scala 3 --------- Co-authored-by: Matthew de Detrich <[email protected]> --- LICENSE | 15 +++- legal/S3License.txt | 7 ++ project/Dependencies.scala | 4 +- .../pekko/stream/connectors/s3/impl/S3Stream.scala | 33 ++++++--- .../connectors/s3/impl/auth/CanonicalRequest.scala | 4 +- .../pekko/stream/connectors/s3/javadsl/S3.scala | 1 + .../pekko/stream/connectors/s3/TestUtils.scala | 6 +- .../connectors/s3/impl/SplitAfterSizeSpec.scala | 3 +- .../s3/impl/auth/CanonicalRequestSpec.scala | 2 +- .../impl/auth/SplitAfterSizeWithContextSpec.scala | 3 +- .../stream/connectors/s3/impl/retry/Retry.scala | 86 ++++++++++++++++++++++ .../stream/connectors/s3/impl/retry/Timeouts.scala | 45 +++++++++++ .../connectors/s3/scaladsl/S3IntegrationSpec.scala | 6 +- .../connectors/s3/scaladsl/S3WireMockBase.scala | 3 +- 14 files changed, 188 insertions(+), 30 deletions(-) diff --git a/LICENSE b/LICENSE index e8e5b8a10..0889548ba 100644 --- a/LICENSE +++ b/LICENSE @@ -202,7 +202,14 @@ --------------- -pekko-mqtt-streaming contains code from paho.mqtt.java <https://github.com/eclipse/paho.mqtt.java>. +pekko-connectors-google-common contains `org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala` +which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>. +The original code was released under the Apache 2.0 license. +Copyright 2021 JWT-Scala Contributors. + +--------------- + +pekko-connectors-mqtt-streaming contains code from paho.mqtt.java <https://github.com/eclipse/paho.mqtt.java>. This code was released under a dual license: Eclipse Public License version 2.0 and Eclipse Distribution License. We choose to use the code under the Eclipse Distribution License. @@ -235,7 +242,7 @@ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. --------------- -pekko-connectors-google-common contains `org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala` -which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>. +pekko-connectors-s3 contains test code in `org.apache.pekko.stream.connectors.s3.impl.retry` package +which is copied from futiles <https://github.com/johanandren/futiles>. The original code was released under the Apache 2.0 license. -Copyright 2021 JWT-Scala Contributors. +Copyright 2015 Johan Andrén. diff --git a/legal/S3License.txt b/legal/S3License.txt index e64484eab..5ec7aa63f 100644 --- a/legal/S3License.txt +++ b/legal/S3License.txt @@ -209,3 +209,10 @@ This code was released under an Apache 2.0 license. AWS SDK for Java Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +--------------- + +pekko-connectors-s3 contains test code in `org.apache.pekko.stream.connectors.s3.impl.retry` package +which is copied from futiles <https://github.com/johanandren/futiles>. +The original code was released under the Apache 2.0 license. +Copyright 2015 Johan Andrén. diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0708a1c6f..2effbd4fc 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -364,7 +364,6 @@ object Dependencies { )) val S3 = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-xml" % PekkoHttpVersion, @@ -373,8 +372,7 @@ object Dependencies { "com.google.jimfs" % "jimfs" % "1.2" % Test, "com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test, "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test, - "org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test, - "com.markatta" %% "futiles" % "2.0.2" % Test)) + "org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test)) val SpringWeb = { val SpringVersion = "5.1.17.RELEASE" diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala index 3ba8f8cf2..78c47caa6 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala @@ -29,7 +29,7 @@ import pekko.http.scaladsl.{ ClientTransport, Http } import pekko.stream.connectors.s3.BucketAccess.{ AccessDenied, AccessGranted, NotExists } import pekko.stream.connectors.s3._ import pekko.stream.connectors.s3.impl.auth.{ CredentialScope, Signer, SigningKey } -import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, Source, Tcp } +import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, Source, SubFlow, Tcp } import pekko.stream.{ Attributes, Materializer } import pekko.util.ByteString import pekko.{ Done, NotUsed } @@ -1177,11 +1177,15 @@ import scala.util.{ Failure, Success, Try } import conf.multipartUploadSettings.retrySettings._ - SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString) - .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks - .mergeSubstreamsWithParallelism(parallelism) + val source1: SubFlow[Chunk, NotUsed, Flow[ByteString, ByteString, NotUsed]#Repr, Sink[ByteString, NotUsed]] = + SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString) + .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks + + val source2 = source1.mergeSubstreamsWithParallelism(parallelism) .filter(_.size > 0) .via(atLeastOne) + + source2 .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState)) .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism }) // Allow requests that fail with transient errors to be retried, using the already buffered chunk. @@ -1278,11 +1282,18 @@ import scala.util.{ Failure, Success, Try } Flow[(ByteString, C)].orElse( Source.single((ByteString.empty, null.asInstanceOf[C]))) - SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext) - .via(getChunk(chunkBufferSize)) - .mergeSubstreamsWithParallelism(parallelism) - .filter { case (chunk, _) => chunk.size > 0 } - .via(atLeastOne) + val source1: SubFlow[(Chunk, immutable.Iterable[C]), NotUsed, Flow[(ByteString, C), (ByteString, C), + NotUsed]#Repr, Sink[(ByteString, C), NotUsed]] = + SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext) + .via(getChunk(chunkBufferSize)) + + val source2: Flow[(ByteString, C), (Chunk, immutable.Iterable[C]), NotUsed] = + source1 + .mergeSubstreamsWithParallelism(parallelism) + .filter { case (chunk, _) => chunk.size > 0 } + .via(atLeastOne) + + source2 .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState)) .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism }) .map { @@ -1379,9 +1390,9 @@ import scala.util.{ Failure, Success, Try } import mat.executionContext Sink .seq[UploadPartResponse] - .mapMaterializedValue { responseFuture: Future[immutable.Seq[UploadPartResponse]] => + .mapMaterializedValue { (responseFuture: Future[immutable.Seq[UploadPartResponse]]) => responseFuture - .flatMap { responses: immutable.Seq[UploadPartResponse] => + .flatMap { (responses: immutable.Seq[UploadPartResponse]) => val successes = responses.collect { case r: SuccessfulUploadPart => r } val failures = responses.collect { case r: FailedUploadPart => r } if (responses.isEmpty) { diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala index 87617f42d..f8270026f 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala @@ -69,7 +69,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest } def canonicalQueryString(query: Query): String = { def uriEncode(s: String): String = s.flatMap { case c if isUnreservedCharacter(c) => c.toString - case c => "%" + c.toHexString.toUpperCase + case c => "%" + Integer.toHexString(c).toUpperCase } query @@ -99,7 +99,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest } if (path.isEmpty) "/" else { path.toString.flatMap { - case c if isReservedCharacter(c) => "%" + c.toHexString.toUpperCase + case c if isReservedCharacter(c) => "%" + Integer.toHexString(c).toUpperCase case c => c.toString } } diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala index b7392a073..41078d8f0 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala @@ -29,6 +29,7 @@ import pekko.stream.connectors.s3.headers.{ CannedAcl, ServerSideEncryption } import pekko.stream.connectors.s3._ import pekko.stream.connectors.s3.impl._ import pekko.stream.javadsl.{ RunnableGraph, Sink, Source } +import pekko.stream.scaladsl.SourceToCompletionStage import pekko.util.ccompat.JavaConverters._ import pekko.util.ByteString import pekko.util.OptionConverters._ diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala index 85fb79a05..a16a28cff 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/TestUtils.scala @@ -17,11 +17,11 @@ package org.apache.pekko.stream.connectors.s3 -import markatta.futiles.Retry import org.apache.commons.lang3.StringUtils import org.apache.pekko -import org.apache.pekko.stream.connectors.s3.scaladsl.S3 -import org.apache.pekko.stream.scaladsl.Sink +import pekko.stream.connectors.s3.impl.retry.Retry +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink import org.scalacheck.Gen import pekko.actor.ActorSystem import pekko.stream.Attributes diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala index 5733b28f9..2871e6537 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala @@ -121,6 +121,7 @@ class SplitAfterSizeSpec(_system: ActorSystem) Seq(ByteString(16), ByteString(17, 18)))) } - def bytes(start: Byte, end: Byte): Array[Byte] = (start to end).map(_.toByte).toArray[Byte] + // https://github.com/lampepfl/dotty/issues/18068 + def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to end).map(_.toByte).toArray[Byte] } diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala index 9658056da..f17877cd0 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequestSpec.scala @@ -128,7 +128,7 @@ class CanonicalRequestSpec extends AnyFlatSpec with Matchers { val reservedCharacters = ":?#[]@!$&'()*+,;=" reservedCharacters.foreach { char => withClue(s"failed for path containing reserved character [$char]:") { - val expectedCharEncoding = "%" + char.toHexString.toUpperCase + val expectedCharEncoding = "%" + Integer.toHexString(char).toUpperCase val request = HttpRequest( diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala index 3f18c84e1..740062365 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala @@ -79,6 +79,7 @@ class SplitAfterSizeWithContextSpec(_system: ActorSystem) Seq((ByteString(17, 18), 2)))) } - def bytes(start: Byte, end: Byte): Array[Byte] = (start to end).map(_.toByte).toArray[Byte] + // https://github.com/lampepfl/dotty/issues/18068 + def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to end).map(_.toByte).toArray[Byte] } diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala new file mode 100644 index 000000000..bed649e60 --- /dev/null +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Retry.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2015 Johan Andrén + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.connectors.s3.impl.retry + +import java.util.concurrent.{ ThreadLocalRandom, TimeUnit } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.Random + +// copied from https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Retry.scala +object Retry { + + private val alwaysRetry: Throwable => Boolean = _ => true + + /** + * Evaluate a block that creates a future up to a specific number of times, if the future fails, decide about + * retrying using a predicate, if it should retry an exponential back off is applied so that the retry waits longer + * and longer for every retry it makes. A jitter is also added so that the exact timing of the retry isn't exactly + * the same for all calls with the same backOffUnit + * + * Any exception in the block creating the future will also be returned as a failed future Default is to retry for + * all throwables. + * + * Based on this wikipedia article: http://en.wikipedia.org/wiki/Truncated_binary_exponential_backoff + */ + def retryWithBackOff[A]( + times: Int, + backOffUnit: FiniteDuration, + shouldRetry: Throwable => Boolean = alwaysRetry)(fBlock: => Future[A])(implicit ec: ExecutionContext): Future[A] = + try + if (times <= 1) fBlock + else retryWithBackOffLoop(times, 1, backOffUnit, shouldRetry)(fBlock) + catch { + // failure to actually create the future + case x: Throwable => Future.failed(x) + } + + private def retryWithBackOffLoop[A]( + totalTimes: Int, + timesTried: Int, + backOffUnit: FiniteDuration, + shouldRetry: Throwable => Boolean)(fBlock: => Future[A])(implicit ec: ExecutionContext): Future[A] = + if (totalTimes <= timesTried) fBlock + else + fBlock.recoverWith { + case ex: Throwable if shouldRetry(ex) => + val timesTriedNow = timesTried + 1 + val backOff = nextBackOff(timesTriedNow, backOffUnit) + Timeouts + .timeout(backOff)(()) + .flatMap(_ => + retryWithBackOffLoop( + totalTimes, + timesTriedNow, + backOffUnit, + shouldRetry)(fBlock)) + } + + private def nextBackOff( + tries: Int, + backOffUnit: FiniteDuration): FiniteDuration = { + require(tries > 0, "tries should start from 1") + val rng = new Random(ThreadLocalRandom.current()) + // jitter between 0.5 and 1.5 + val jitter = 0.5 + rng.nextDouble() + val factor = math.pow(2, tries) * jitter + FiniteDuration( + (backOffUnit.toMillis * factor).toLong, + TimeUnit.MILLISECONDS) + } + +} diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala new file mode 100644 index 000000000..f8be639d4 --- /dev/null +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/retry/Timeouts.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2015 Johan Andrén + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.connectors.s3.impl.retry + +import java.util.{ Timer, TimerTask } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.util.Try + +// copied from https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Timeouts.scala +object Timeouts { + + private val timer = new Timer() + + /** + * When ```waitFor``` has passed, evaluate ```what``` on the given execution context and complete the future + */ + def timeout[A](waitFor: FiniteDuration)(what: => A)(implicit ec: ExecutionContext): Future[A] = { + val promise = Promise[A]() + timer.schedule(new TimerTask { + override def run(): Unit = + // make sure we do not block the timer thread + Future { + promise.complete(Try(what)) + } + }, + waitFor.toMillis) + + promise.future + } +} diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala index 9989117c9..e13410191 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala @@ -1293,8 +1293,8 @@ class AWSS3IntegrationSpec extends TestKit(ActorSystem("AWSS3IntegrationSpec")) }.orElse(Some(1.minute)) // Since S3 accounts share global state, we should randomly generate bucket names so concurrent tests - // against an S3 account don't conflict with eachother - override lazy val randomlyGenerateBucketNames: Boolean = + // against an S3 account don't conflict with each other + override val randomlyGenerateBucketNames: Boolean = sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.randomlyGenerateBucketNames") .map(_.toBoolean).getOrElse(true) } @@ -1313,7 +1313,7 @@ class MinioS3IntegrationSpec // Since a unique new Minio container is started with each test run there is no point in making random // bucket names - override lazy val randomlyGenerateBucketNames: Boolean = false + override val randomlyGenerateBucketNames: Boolean = false override lazy val defaultS3Settings: S3Settings = s3Settings .withS3RegionProvider( diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala index 522ed3a96..7a6bb91f1 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala @@ -33,7 +33,8 @@ import software.amazon.awssdk.regions.Region abstract class S3WireMockBase(_system: ActorSystem, val _wireMockServer: WireMockServer) extends TestKit(_system) { private def this(mock: WireMockServer) = - this(ActorSystem(getCallerName(getClass), config(mock.port()).withFallback(ConfigFactory.load())), mock) + this(ActorSystem(getCallerName(classOf[S3WireMockBase]), config(mock.port()).withFallback(ConfigFactory.load())), + mock) def this() = { this(initServer()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
