This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new e8fbec6e5 fix some compiler warnings (#651)
e8fbec6e5 is described below

commit e8fbec6e5f425face842fb8aa26df9c09407efac
Author: navid <[email protected]>
AuthorDate: Wed Jan 29 20:45:06 2025 +0100

    fix some compiler warnings (#651)
---
 .../impl/engine/client/HttpsProxyGraphStage.scala  |  4 +--
 .../client/OutgoingConnectionBlueprint.scala       |  2 +-
 .../engine/client/pool/NewHostConnectionPool.scala |  8 +++---
 .../http/impl/engine/http2/Http2Multiplexer.scala  |  4 +--
 .../impl/engine/http2/Http2StreamHandling.scala    |  2 +-
 .../http/impl/engine/http2/ProtocolSwitch.scala    |  6 ++--
 .../engine/http2/client/PersistentConnection.scala |  8 +++---
 .../rendering/HttpRequestRendererFactory.scala     |  2 +-
 .../rendering/HttpResponseRendererFactory.scala    |  2 +-
 .../impl/engine/server/HttpServerBluePrint.scala   | 32 +++++++++++-----------
 .../pekko/http/impl/util/One2OneBidiFlow.scala     |  4 +--
 .../apache/pekko/http/impl/util/StreamUtils.scala  |  4 +--
 .../pekko/http/javadsl/WSEchoTestClientApp.java    |  2 +-
 .../engine/client/HostConnectionPoolSpec.scala     |  2 +-
 .../impl/engine/client/NewConnectionPoolSpec.scala |  2 +-
 .../impl/engine/parsing/RequestParserSpec.scala    |  2 +-
 .../pekko/http/scaladsl/model/HttpEntitySpec.scala |  2 +-
 .../http/scaladsl/server/EntityStreamingSpec.scala |  2 +-
 18 files changed, 45 insertions(+), 45 deletions(-)

diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
index c9b8006b5..458e5a2c4 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
@@ -189,7 +189,7 @@ private final class HttpsProxyGraphStage(
             }
           }
 
-          override def onDownstreamFinish(): Unit = cancel(sslIn)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(sslIn)
 
         })
 
@@ -199,7 +199,7 @@ private final class HttpsProxyGraphStage(
             pull(bytesIn)
           }
 
-          override def onDownstreamFinish(): Unit = cancel(bytesIn)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(bytesIn)
 
         })
 
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
index 5516ff801..61cc26a97 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
@@ -212,7 +212,7 @@ private[http] object OutgoingConnectionBlueprint {
           if (!entitySubstreamStarted) pull(responseOutputIn)
         }
 
-        override def onDownstreamFinish(): Unit = {
+        override def onDownstreamFinish(cause: Throwable): Unit = {
           // if downstream cancels while streaming entity,
           // make sure we also cancel the entity source, but
           // after being done with streaming the entity
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
index ebc787449..6327bf1a4 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
@@ -551,7 +551,7 @@ private[client] object NewHostConnectionPool {
 
           def onPull(): Unit = () // emitRequests makes sure not to push too 
early
 
-          override def onDownstreamFinish(): Unit =
+          override def onDownstreamFinish(cause: Throwable): Unit =
             withSlot { slot =>
               slot.debug("Connection cancelled")
               // Let's use StreamTcpException for now.
@@ -574,7 +574,7 @@ private[client] object NewHostConnectionPool {
                   requestOut.setHandler(connection)
                 }
 
-                override def onDownstreamFinish(): Unit = 
connection.onDownstreamFinish()
+                override def onDownstreamFinish(cause: Throwable): Unit = 
connection.onDownstreamFinish(cause)
               })
         }
         def openConnection(slot: Slot): SlotConnection = {
@@ -622,9 +622,9 @@ private[client] object NewHostConnectionPool {
           log.debug("Pool upstream failed with {}", ex)
           super.onUpstreamFailure(ex)
         }
-        override def onDownstreamFinish(): Unit = {
+        override def onDownstreamFinish(cause: Throwable): Unit = {
           log.debug("Pool downstream cancelled")
-          super.onDownstreamFinish()
+          super.onDownstreamFinish(cause)
         }
         override def postStop(): Unit = {
           slots.foreach(_.shutdown())
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
index 808f7ef29..633d26b66 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
@@ -133,9 +133,9 @@ private[http2] trait Http2MultiplexerSupport { logic: 
GraphStageLogic with Stage
       /** Network pulls in new frames */
       def onPull(): Unit = updateState(_.onPull())
 
-      override def onDownstreamFinish(): Unit = {
+      override def onDownstreamFinish(cause: Throwable): Unit = {
         frameOutFinished()
-        super.onDownstreamFinish()
+        super.onDownstreamFinish(cause)
       }
 
       private var _state: MultiplexerState = Idle
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
index 2969be8a7..bbd1d1540 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
@@ -598,7 +598,7 @@ private[http2] trait Http2StreamHandling extends 
GraphStageLogic with LogHelper
     outlet.setHandler(this)
 
     def onPull(): Unit = incomingStreamPulled(streamId)
-    override def onDownstreamFinish(): Unit = {
+    override def onDownstreamFinish(cause: Throwable): Unit = {
       debug(s"Incoming side of stream [$streamId]: cancelling because 
downstream finished")
       multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.CANCEL))
       // FIXME: go through state machine and don't manipulate vars directly 
here
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
index d19865763..fa54b75a9 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
@@ -106,7 +106,7 @@ private[http] object ProtocolSwitch {
                 new OutHandler {
                   override def onPull(): Unit = pull(in)
 
-                  override def onDownstreamFinish(): Unit = cancel(in)
+                  override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(in)
                 }
 
               val firstHandler =
@@ -151,9 +151,9 @@ private[http] object ProtocolSwitch {
               val outHandler = new OutHandler {
                 override def onPull(): Unit = in.pull()
 
-                override def onDownstreamFinish(): Unit = {
+                override def onDownstreamFinish(cause: Throwable): Unit = {
                   in.cancel()
-                  super.onDownstreamFinish()
+                  super.onDownstreamFinish(cause)
                 }
               }
               in.setHandler(handler)
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
index 4215e68fd..6df999080 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
@@ -123,7 +123,7 @@ private[http2] object PersistentConnection {
           requestOut.setHandler(new OutHandler {
             override def onPull(): Unit =
               requestOutPulled = true
-            override def onDownstreamFinish(): Unit = ()
+            override def onDownstreamFinish(cause: Throwable): Unit = ()
           })
           responseIn.setHandler(new InHandler {
             override def onPush(): Unit = throw new IllegalStateException("no 
response push expected while connecting")
@@ -196,7 +196,7 @@ private[http2] object PersistentConnection {
               if (!isAvailable(requestIn)) pull(requestIn)
               else dispatchRequest(grab(requestIn))
 
-            override def onDownstreamFinish(): Unit = onDisconnected()
+            override def onDownstreamFinish(cause: Throwable): Unit = 
onDisconnected()
           })
           responseIn.setHandler(new InHandler {
             override def onPush(): Unit = {
@@ -253,10 +253,10 @@ private[http2] object PersistentConnection {
             responseIn.cancel()
             failStage(ex)
           }
-          override def onDownstreamFinish(): Unit = {
+          override def onDownstreamFinish(cause: Throwable): Unit = {
             requestOut.complete()
             responseIn.cancel()
-            super.onDownstreamFinish()
+            super.onDownstreamFinish(cause)
           }
         }
       }
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
index a7a924ec0..7bd8e908e 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
@@ -136,7 +136,7 @@ private[http] class HttpRequestRendererFactory(
       val stream = ctx.sendEntityTrigger match {
         case None => headerPart ++ body
         case Some(future) =>
-          val barrier = 
Source.fromFuture(future).drop(1).asInstanceOf[Source[ByteString, Any]]
+          val barrier = 
Source.future(future).drop(1).asInstanceOf[Source[ByteString, Any]]
           (headerPart ++ barrier ++ body).recoverWithRetries(-1,
             { case HttpResponseParser.OneHundredContinueError => Source.empty 
})
       }
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
index 690f42786..5b8fce6cd 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
@@ -129,7 +129,7 @@ private[http] class HttpResponseRendererFactory(
               override def onPull(): Unit =
                 if (!headersSent) sendHeaders()
                 else sinkIn.pull()
-              override def onDownstreamFinish(): Unit = {
+              override def onDownstreamFinish(cause: Throwable): Unit = {
                 completeStage()
                 stopTransfer()
               }
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
index 71ee7de31..6b49a2e29 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
@@ -135,7 +135,7 @@ private[http] object HttpServerBluePrint {
         }
 
         // optimization: this callback is used to handle entity substream 
cancellation to avoid allocating a dedicated handler
-        override def onDownstreamFinish(): Unit = {
+        override def onDownstreamFinish(cause: Throwable): Unit = {
           if (entitySource ne null) {
             // application layer has cancelled or only partially consumed 
response entity:
             // connection will be closed
@@ -235,7 +235,7 @@ private[http] object HttpServerBluePrint {
               // so can pull downstream then
               downstreamPullWaiting = true
             }
-            override def onDownstreamFinish(): Unit = {
+            override def onDownstreamFinish(cause: Throwable): Unit = {
               // downstream signalled not wanting any more requests
               // we should keep processing the entity stream and then
               // when it completes complete the stage
@@ -320,14 +320,14 @@ private[http] object HttpServerBluePrint {
             openTimeouts = openTimeouts.enqueue(access)
             push(requestOut, 
request.addHeader(`Timeout-Access`(access)).withEntity(entity))
           }
-          override def onUpstreamFinish() = complete(requestOut)
-          override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex)
+          override def onUpstreamFinish(): Unit = complete(requestOut)
+          override def onUpstreamFailure(ex: Throwable): Unit = 
fail(requestOut, ex)
         })
       // TODO: provide and use default impl for simply connecting an input and 
an output port as we do here
       setHandler(requestOut,
         new OutHandler {
           def onPull(): Unit = pull(requestIn)
-          override def onDownstreamFinish() = cancel(requestIn)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(requestIn)
         })
       setHandler(responseIn,
         new InHandler {
@@ -336,13 +336,13 @@ private[http] object HttpServerBluePrint {
             openTimeouts = openTimeouts.tail
             push(responseOut, grab(responseIn))
           }
-          override def onUpstreamFinish() = complete(responseOut)
-          override def onUpstreamFailure(ex: Throwable) = fail(responseOut, ex)
+          override def onUpstreamFinish(): Unit = complete(responseOut)
+          override def onUpstreamFailure(ex: Throwable): Unit = 
fail(responseOut, ex)
         })
       setHandler(responseOut,
         new OutHandler {
           def onPull(): Unit = pull(responseIn)
-          override def onDownstreamFinish() = cancel(responseIn)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(responseIn)
         })
     }
   }
@@ -481,7 +481,7 @@ private[http] object HttpServerBluePrint {
             def onPull(): Unit =
               if (oneHundredContinueResponsePending) pullSuppressed = true
               else if (!hasBeenPulled(requestParsingIn)) pull(requestParsingIn)
-            override def onDownstreamFinish(): Unit =
+            override def onDownstreamFinish(cause: Throwable): Unit =
               if (openRequests.isEmpty) completeStage()
               else failStage(
                 new IllegalStateException("User handler flow was cancelled 
with ongoing request") with NoStackTrace)
@@ -705,7 +705,7 @@ private[http] object HttpServerBluePrint {
       setHandler(toNet,
         new OutHandler {
           override def onPull(): Unit = pull(fromHttp)
-          override def onDownstreamFinish(): Unit = completeStage()
+          override def onDownstreamFinish(cause: Throwable): Unit = 
completeStage()
         })
 
       setHandler(fromNet,
@@ -717,7 +717,7 @@ private[http] object HttpServerBluePrint {
       setHandler(toHttp,
         new OutHandler {
           override def onPull(): Unit = pull(fromNet)
-          override def onDownstreamFinish(): Unit = cancel(fromNet)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(fromNet)
         })
 
       private var activeTimers = 0
@@ -753,7 +753,7 @@ private[http] object HttpServerBluePrint {
           setHandler(toNet,
             new OutHandler {
               override def onPull(): Unit = sinkIn.pull()
-              override def onDownstreamFinish(): Unit = {
+              override def onDownstreamFinish(cause: Throwable): Unit = {
                 completeStage()
                 sinkIn.cancel()
               }
@@ -771,7 +771,7 @@ private[http] object HttpServerBluePrint {
           setHandler(toNet,
             new OutHandler {
               override def onPull(): Unit = sinkIn.pull()
-              override def onDownstreamFinish(): Unit = {
+              override def onDownstreamFinish(cause: Throwable): Unit = {
                 completeStage()
                 sinkIn.cancel()
                 sourceOut.complete()
@@ -801,10 +801,10 @@ private[http] object HttpServerBluePrint {
 
               sourceOut.setHandler(new OutHandler {
                 override def onPull(): Unit = if (!hasBeenPulled(fromNet)) 
pull(fromNet)
-                override def onDownstreamFinish(): Unit = cancel(fromNet)
+                override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(fromNet)
               })
             }
-            override def onDownstreamFinish(): Unit = cancel(fromNet)
+            override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(fromNet)
           })
 
           // disable the old handlers, at this point we might still get 
something due to cancellation delay which we need to ignore
@@ -814,7 +814,7 @@ private[http] object HttpServerBluePrint {
               override def onPull(): Unit = ()
               override def onUpstreamFinish(): Unit = ()
               override def onUpstreamFailure(ex: Throwable): Unit = ()
-              override def onDownstreamFinish(): Unit = ()
+              override def onDownstreamFinish(cause: Throwable): Unit = ()
             })
 
           newFlow.runWith(sourceOut.source, sinkIn.sink)(subFusingMaterializer)
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
index f0c951374..db3872e67 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
@@ -92,7 +92,7 @@ private[http] object One2OneBidiFlow {
           override def onPull(): Unit =
             if (insideWrappedFlow < maxPending || maxPending == -1) pull(in)
             else pullSuppressed = true
-          override def onDownstreamFinish(): Unit = cancel(in)
+          override def onDownstreamFinish(cause: Throwable): Unit = cancel(in)
         })
 
       setHandler(fromWrapped,
@@ -117,7 +117,7 @@ private[http] object One2OneBidiFlow {
       setHandler(out,
         new OutHandler {
           override def onPull(): Unit = pull(fromWrapped)
-          override def onDownstreamFinish(): Unit = cancel(fromWrapped)
+          override def onDownstreamFinish(cause: Throwable): Unit = 
cancel(fromWrapped)
         })
     }
   }
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala 
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
index cf5822c54..013b08e65 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
@@ -202,7 +202,7 @@ private[http] object StreamUtils {
   object OneTimeValve {
     def apply(): OneTimeValve = new OneTimeValve {
       val promise = Promise[Unit]()
-      val _source = Source.fromFuture(promise.future).drop(1) // we are only 
interested in the completion event
+      val _source = Source.future(promise.future).drop(1) // we are only 
interested in the completion event
 
       def source[T]: Source[T, NotUsed] = _source.asInstanceOf[Source[T, 
NotUsed]] // safe, because source won't generate any elements
       def open(): Unit = promise.success(())
@@ -226,7 +226,7 @@ private[http] object StreamUtils {
 
         var timeout: OptionVal[Cancellable] = OptionVal.None
 
-        override def onDownstreamFinish(): Unit = {
+        override def onDownstreamFinish(cause: Throwable): Unit = {
           cancelAfter match {
             case finite: FiniteDuration =>
               log.debug(s"Delaying cancellation for $finite")
diff --git 
a/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
 
b/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
index 9e6efbbbe..b8faa49c9 100644
--- 
a/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
+++ 
b/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
@@ -67,7 +67,7 @@ public class WSEchoTestClientApp {
                       TextMessage.create("abc"),
                       TextMessage.create("def"),
                       TextMessage.create("ghi")))
-              .concat(Source.fromFuture(delayedCompletion).drop(1));
+              .concat(Source.future(delayedCompletion).drop(1));
 
       Sink<Message, CompletionStage<List<String>>> echoSink =
           Flow.of(Message.class)
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
index 4f2342c57..9266b28cd 100644
--- 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
@@ -803,7 +803,7 @@ class HostConnectionPoolSpec extends 
PekkoSpecWithMaterializer(
             super.onUpstreamFailure(ex)
           }
 
-          override def onDownstreamFinish(): Unit = failStage(new 
RuntimeException("was cancelled"))
+          override def onDownstreamFinish(cause: Throwable): Unit = 
failStage(new RuntimeException("was cancelled"))
         }
         setHandlers(reqIn, reqOut, new MonitorMessage(reqIn, reqOut))
         setHandlers(resIn, resOut, new MonitorMessage(resIn, resOut))
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
index d1e3b4112..9ce4f32c4 100644
--- 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
@@ -247,7 +247,7 @@ class NewConnectionPoolSpec extends 
PekkoSpecWithMaterializer("""
 
       val crashingEntity =
         Source.fromIterator(() => Iterator.fill(10)(ByteString("abc")))
-          .concat(Source.fromFuture(errorOnConnection1.future))
+          .concat(Source.future(errorOnConnection1.future))
           .log("response-entity-stream")
           .addAttributes(Attributes.logLevels(Logging.InfoLevel, 
Logging.InfoLevel, Logging.InfoLevel))
 
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
index 96c092764..82db1d8c4 100644
--- 
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
@@ -810,7 +810,7 @@ abstract class RequestParserSpec(mode: String, newLine: 
String) extends AnyFreeS
         }
         .concatSubstreams
         .flatMapConcat { x =>
-          Source.fromFuture {
+          Source.future {
             x match {
               case Right(request) => compactEntity(request.entity).fast.map(x 
=> Right(request.withEntity(x)))
               case Left(error)    => FastFuture.successful(Left(error))
diff --git 
a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
 
b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
index ceffbd1bc..db20148b0 100755
--- 
a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
+++ 
b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
@@ -92,7 +92,7 @@ class HttpEntitySpec extends PekkoSpecWithMaterializer {
       "Infinite data stream" in {
         val neverCompleted = Promise[ByteString]()
         intercept[TimeoutException] {
-          Await.result(Default(tpe, 42, 
Source.fromFuture(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
+          Await.result(Default(tpe, 42, 
Source.future(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
         }.getMessage should be(
           "HttpEntity.toStrict timed out after 100 milliseconds while still 
waiting for outstanding data")
       }
diff --git 
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
 
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
index 166eb1b27..ca3039acf 100755
--- 
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
+++ 
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
@@ -131,7 +131,7 @@ class EntityStreamingSpec extends RoutingSpec with 
ScalaFutures {
 
     // flatten the Future[Source[]] into a Source[]:
     val source: Source[Tweet, Future[NotUsed]] =
-      Source.fromFutureSource(unmarshalled)
+      Source.futureSource(unmarshalled)
 
     // #json-streaming-client-example
     // tests ------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to