This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new bd2860a9b Remove usages of Stream
bd2860a9b is described below
commit bd2860a9bc7b7ff00f93478f94bb89e58ee07ed6
Author: Matthew de Detrich <[email protected]>
AuthorDate: Tue Mar 21 12:09:35 2023 +0100
Remove usages of Stream
---
file/src/test/scala/docs/scaladsl/ExecutableUtils.scala | 4 +---
.../org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala | 7 ++-----
2 files changed, 3 insertions(+), 8 deletions(-)
diff --git a/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
b/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
index 8baf7fec8..f27949dfa 100644
--- a/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
+++ b/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
@@ -18,7 +18,6 @@ import java.nio.file.{ Files, Path, Paths }
import org.apache.pekko.util.ByteString
-import scala.annotation.nowarn
import scala.concurrent.Future
import scala.sys.process.{ BasicIO, Process }
@@ -58,10 +57,9 @@ object ExecutableUtils {
finally stream.close()
}
- @nowarn("msg=deprecated")
private def readStream(stream: InputStream): ByteString = {
val reader = new BufferedInputStream(stream)
- try ByteString(Stream.continually(reader.read).takeWhile(_ !=
-1).map(_.toByte).toArray)
+ try ByteString(Iterator.continually(reader.read).takeWhile(_ !=
-1).map(_.toByte).toArray)
finally reader.close()
}
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 715ae3067..524422f7f 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -35,7 +35,6 @@ import pekko.util.ByteString
import pekko.{ Done, NotUsed }
import software.amazon.awssdk.regions.Region
-import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
@@ -1025,14 +1024,13 @@ import scala.util.{ Failure, Success, Try }
/**
* Initiates a multipart upload. Returns a source of the initiated upload
with upload part indicess
*/
- @nowarn("msg=deprecated")
private def initiateUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int),
NotUsed] =
Source
.single(s3Location)
.flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
- .mapConcat(r => Stream.continually(r))
+ .flatMapConcat(r => Source.repeat(r))
.zip(Source.fromIterator(() => Iterator.from(1)))
private def poolSettings(implicit settings: S3Settings, system: ActorSystem)
=
@@ -1273,7 +1271,6 @@ import scala.util.{ Failure, Success, Try }
.mapMaterializedValue(_ => NotUsed)
}
- @nowarn("msg=deprecated")
private def requestInfoOrUploadState(s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
@@ -1285,7 +1282,7 @@ import scala.util.{ Failure, Success, Try }
Source
.single(s3Location)
.flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket,
s3Location.key, uploadId)))
- .mapConcat(r => Stream.continually(r))
+ .flatMapConcat(r => Source.repeat(r))
.zip(Source.fromIterator(() => Iterator.from(initialIndex)))
case None =>
// First step of the multi part upload process is made.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]