This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new bd2860a9b Remove usages of Stream
bd2860a9b is described below

commit bd2860a9bc7b7ff00f93478f94bb89e58ee07ed6
Author: Matthew de Detrich <[email protected]>
AuthorDate: Tue Mar 21 12:09:35 2023 +0100

    Remove usages of Stream
---
 file/src/test/scala/docs/scaladsl/ExecutableUtils.scala            | 4 +---
 .../org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala      | 7 ++-----
 2 files changed, 3 insertions(+), 8 deletions(-)

diff --git a/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala 
b/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
index 8baf7fec8..f27949dfa 100644
--- a/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
+++ b/file/src/test/scala/docs/scaladsl/ExecutableUtils.scala
@@ -18,7 +18,6 @@ import java.nio.file.{ Files, Path, Paths }
 
 import org.apache.pekko.util.ByteString
 
-import scala.annotation.nowarn
 import scala.concurrent.Future
 import scala.sys.process.{ BasicIO, Process }
 
@@ -58,10 +57,9 @@ object ExecutableUtils {
     finally stream.close()
   }
 
-  @nowarn("msg=deprecated")
   private def readStream(stream: InputStream): ByteString = {
     val reader = new BufferedInputStream(stream)
-    try ByteString(Stream.continually(reader.read).takeWhile(_ != 
-1).map(_.toByte).toArray)
+    try ByteString(Iterator.continually(reader.read).takeWhile(_ != 
-1).map(_.toByte).toArray)
     finally reader.close()
   }
 
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 715ae3067..524422f7f 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -35,7 +35,6 @@ import pekko.util.ByteString
 import pekko.{ Done, NotUsed }
 import software.amazon.awssdk.regions.Region
 
-import scala.annotation.nowarn
 import scala.collection.immutable
 import scala.concurrent.{ Future, Promise }
 import scala.util.{ Failure, Success, Try }
@@ -1025,14 +1024,13 @@ import scala.util.{ Failure, Success, Try }
   /**
    * Initiates a multipart upload. Returns a source of the initiated upload 
with upload part indicess
    */
-  @nowarn("msg=deprecated")
   private def initiateUpload(s3Location: S3Location,
       contentType: ContentType,
       s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int), 
NotUsed] =
     Source
       .single(s3Location)
       .flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
-      .mapConcat(r => Stream.continually(r))
+      .flatMapConcat(r => Source.repeat(r))
       .zip(Source.fromIterator(() => Iterator.from(1)))
 
   private def poolSettings(implicit settings: S3Settings, system: ActorSystem) 
=
@@ -1273,7 +1271,6 @@ import scala.util.{ Failure, Success, Try }
       .mapMaterializedValue(_ => NotUsed)
   }
 
-  @nowarn("msg=deprecated")
   private def requestInfoOrUploadState(s3Location: S3Location,
       contentType: ContentType,
       s3Headers: S3Headers,
@@ -1285,7 +1282,7 @@ import scala.util.{ Failure, Success, Try }
         Source
           .single(s3Location)
           .flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket, 
s3Location.key, uploadId)))
-          .mapConcat(r => Stream.continually(r))
+          .flatMapConcat(r => Source.repeat(r))
           .zip(Source.fromIterator(() => Iterator.from(initialIndex)))
       case None =>
         // First step of the multi part upload process is made.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to