This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git
The following commit(s) were added to refs/heads/main by this push:
new e8fbec6e5 fix some compiler warnings (#651)
e8fbec6e5 is described below
commit e8fbec6e5f425face842fb8aa26df9c09407efac
Author: navid <[email protected]>
AuthorDate: Wed Jan 29 20:45:06 2025 +0100
fix some compiler warnings (#651)
---
.../impl/engine/client/HttpsProxyGraphStage.scala | 4 +--
.../client/OutgoingConnectionBlueprint.scala | 2 +-
.../engine/client/pool/NewHostConnectionPool.scala | 8 +++---
.../http/impl/engine/http2/Http2Multiplexer.scala | 4 +--
.../impl/engine/http2/Http2StreamHandling.scala | 2 +-
.../http/impl/engine/http2/ProtocolSwitch.scala | 6 ++--
.../engine/http2/client/PersistentConnection.scala | 8 +++---
.../rendering/HttpRequestRendererFactory.scala | 2 +-
.../rendering/HttpResponseRendererFactory.scala | 2 +-
.../impl/engine/server/HttpServerBluePrint.scala | 32 +++++++++++-----------
.../pekko/http/impl/util/One2OneBidiFlow.scala | 4 +--
.../apache/pekko/http/impl/util/StreamUtils.scala | 4 +--
.../pekko/http/javadsl/WSEchoTestClientApp.java | 2 +-
.../engine/client/HostConnectionPoolSpec.scala | 2 +-
.../impl/engine/client/NewConnectionPoolSpec.scala | 2 +-
.../impl/engine/parsing/RequestParserSpec.scala | 2 +-
.../pekko/http/scaladsl/model/HttpEntitySpec.scala | 2 +-
.../http/scaladsl/server/EntityStreamingSpec.scala | 2 +-
18 files changed, 45 insertions(+), 45 deletions(-)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
index c9b8006b5..458e5a2c4 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala
@@ -189,7 +189,7 @@ private final class HttpsProxyGraphStage(
}
}
- override def onDownstreamFinish(): Unit = cancel(sslIn)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(sslIn)
})
@@ -199,7 +199,7 @@ private final class HttpsProxyGraphStage(
pull(bytesIn)
}
- override def onDownstreamFinish(): Unit = cancel(bytesIn)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(bytesIn)
})
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
index 5516ff801..61cc26a97 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala
@@ -212,7 +212,7 @@ private[http] object OutgoingConnectionBlueprint {
if (!entitySubstreamStarted) pull(responseOutputIn)
}
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
// if downstream cancels while streaming entity,
// make sure we also cancel the entity source, but
// after being done with streaming the entity
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
index ebc787449..6327bf1a4 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/pool/NewHostConnectionPool.scala
@@ -551,7 +551,7 @@ private[client] object NewHostConnectionPool {
def onPull(): Unit = () // emitRequests makes sure not to push too
early
- override def onDownstreamFinish(): Unit =
+ override def onDownstreamFinish(cause: Throwable): Unit =
withSlot { slot =>
slot.debug("Connection cancelled")
// Let's use StreamTcpException for now.
@@ -574,7 +574,7 @@ private[client] object NewHostConnectionPool {
requestOut.setHandler(connection)
}
- override def onDownstreamFinish(): Unit =
connection.onDownstreamFinish()
+ override def onDownstreamFinish(cause: Throwable): Unit =
connection.onDownstreamFinish(cause)
})
}
def openConnection(slot: Slot): SlotConnection = {
@@ -622,9 +622,9 @@ private[client] object NewHostConnectionPool {
log.debug("Pool upstream failed with {}", ex)
super.onUpstreamFailure(ex)
}
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
log.debug("Pool downstream cancelled")
- super.onDownstreamFinish()
+ super.onDownstreamFinish(cause)
}
override def postStop(): Unit = {
slots.foreach(_.shutdown())
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
index 808f7ef29..633d26b66 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala
@@ -133,9 +133,9 @@ private[http2] trait Http2MultiplexerSupport { logic:
GraphStageLogic with Stage
/** Network pulls in new frames */
def onPull(): Unit = updateState(_.onPull())
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
frameOutFinished()
- super.onDownstreamFinish()
+ super.onDownstreamFinish(cause)
}
private var _state: MultiplexerState = Idle
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
index 2969be8a7..bbd1d1540 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala
@@ -598,7 +598,7 @@ private[http2] trait Http2StreamHandling extends
GraphStageLogic with LogHelper
outlet.setHandler(this)
def onPull(): Unit = incomingStreamPulled(streamId)
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
debug(s"Incoming side of stream [$streamId]: cancelling because
downstream finished")
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.CANCEL))
// FIXME: go through state machine and don't manipulate vars directly
here
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
index d19865763..fa54b75a9 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
@@ -106,7 +106,7 @@ private[http] object ProtocolSwitch {
new OutHandler {
override def onPull(): Unit = pull(in)
- override def onDownstreamFinish(): Unit = cancel(in)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(in)
}
val firstHandler =
@@ -151,9 +151,9 @@ private[http] object ProtocolSwitch {
val outHandler = new OutHandler {
override def onPull(): Unit = in.pull()
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
in.cancel()
- super.onDownstreamFinish()
+ super.onDownstreamFinish(cause)
}
}
in.setHandler(handler)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
index 4215e68fd..6df999080 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala
@@ -123,7 +123,7 @@ private[http2] object PersistentConnection {
requestOut.setHandler(new OutHandler {
override def onPull(): Unit =
requestOutPulled = true
- override def onDownstreamFinish(): Unit = ()
+ override def onDownstreamFinish(cause: Throwable): Unit = ()
})
responseIn.setHandler(new InHandler {
override def onPush(): Unit = throw new IllegalStateException("no
response push expected while connecting")
@@ -196,7 +196,7 @@ private[http2] object PersistentConnection {
if (!isAvailable(requestIn)) pull(requestIn)
else dispatchRequest(grab(requestIn))
- override def onDownstreamFinish(): Unit = onDisconnected()
+ override def onDownstreamFinish(cause: Throwable): Unit =
onDisconnected()
})
responseIn.setHandler(new InHandler {
override def onPush(): Unit = {
@@ -253,10 +253,10 @@ private[http2] object PersistentConnection {
responseIn.cancel()
failStage(ex)
}
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
requestOut.complete()
responseIn.cancel()
- super.onDownstreamFinish()
+ super.onDownstreamFinish(cause)
}
}
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
index a7a924ec0..7bd8e908e 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpRequestRendererFactory.scala
@@ -136,7 +136,7 @@ private[http] class HttpRequestRendererFactory(
val stream = ctx.sendEntityTrigger match {
case None => headerPart ++ body
case Some(future) =>
- val barrier =
Source.fromFuture(future).drop(1).asInstanceOf[Source[ByteString, Any]]
+ val barrier =
Source.future(future).drop(1).asInstanceOf[Source[ByteString, Any]]
(headerPart ++ barrier ++ body).recoverWithRetries(-1,
{ case HttpResponseParser.OneHundredContinueError => Source.empty
})
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
index 690f42786..5b8fce6cd 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
@@ -129,7 +129,7 @@ private[http] class HttpResponseRendererFactory(
override def onPull(): Unit =
if (!headersSent) sendHeaders()
else sinkIn.pull()
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
stopTransfer()
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
index 71ee7de31..6b49a2e29 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala
@@ -135,7 +135,7 @@ private[http] object HttpServerBluePrint {
}
// optimization: this callback is used to handle entity substream
cancellation to avoid allocating a dedicated handler
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed
response entity:
// connection will be closed
@@ -235,7 +235,7 @@ private[http] object HttpServerBluePrint {
// so can pull downstream then
downstreamPullWaiting = true
}
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
// downstream signalled not wanting any more requests
// we should keep processing the entity stream and then
// when it completes complete the stage
@@ -320,14 +320,14 @@ private[http] object HttpServerBluePrint {
openTimeouts = openTimeouts.enqueue(access)
push(requestOut,
request.addHeader(`Timeout-Access`(access)).withEntity(entity))
}
- override def onUpstreamFinish() = complete(requestOut)
- override def onUpstreamFailure(ex: Throwable) = fail(requestOut, ex)
+ override def onUpstreamFinish(): Unit = complete(requestOut)
+ override def onUpstreamFailure(ex: Throwable): Unit =
fail(requestOut, ex)
})
// TODO: provide and use default impl for simply connecting an input and
an output port as we do here
setHandler(requestOut,
new OutHandler {
def onPull(): Unit = pull(requestIn)
- override def onDownstreamFinish() = cancel(requestIn)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(requestIn)
})
setHandler(responseIn,
new InHandler {
@@ -336,13 +336,13 @@ private[http] object HttpServerBluePrint {
openTimeouts = openTimeouts.tail
push(responseOut, grab(responseIn))
}
- override def onUpstreamFinish() = complete(responseOut)
- override def onUpstreamFailure(ex: Throwable) = fail(responseOut, ex)
+ override def onUpstreamFinish(): Unit = complete(responseOut)
+ override def onUpstreamFailure(ex: Throwable): Unit =
fail(responseOut, ex)
})
setHandler(responseOut,
new OutHandler {
def onPull(): Unit = pull(responseIn)
- override def onDownstreamFinish() = cancel(responseIn)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(responseIn)
})
}
}
@@ -481,7 +481,7 @@ private[http] object HttpServerBluePrint {
def onPull(): Unit =
if (oneHundredContinueResponsePending) pullSuppressed = true
else if (!hasBeenPulled(requestParsingIn)) pull(requestParsingIn)
- override def onDownstreamFinish(): Unit =
+ override def onDownstreamFinish(cause: Throwable): Unit =
if (openRequests.isEmpty) completeStage()
else failStage(
new IllegalStateException("User handler flow was cancelled
with ongoing request") with NoStackTrace)
@@ -705,7 +705,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = pull(fromHttp)
- override def onDownstreamFinish(): Unit = completeStage()
+ override def onDownstreamFinish(cause: Throwable): Unit =
completeStage()
})
setHandler(fromNet,
@@ -717,7 +717,7 @@ private[http] object HttpServerBluePrint {
setHandler(toHttp,
new OutHandler {
override def onPull(): Unit = pull(fromNet)
- override def onDownstreamFinish(): Unit = cancel(fromNet)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(fromNet)
})
private var activeTimers = 0
@@ -753,7 +753,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
sinkIn.cancel()
}
@@ -771,7 +771,7 @@ private[http] object HttpServerBluePrint {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
completeStage()
sinkIn.cancel()
sourceOut.complete()
@@ -801,10 +801,10 @@ private[http] object HttpServerBluePrint {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = if (!hasBeenPulled(fromNet))
pull(fromNet)
- override def onDownstreamFinish(): Unit = cancel(fromNet)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(fromNet)
})
}
- override def onDownstreamFinish(): Unit = cancel(fromNet)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(fromNet)
})
// disable the old handlers, at this point we might still get
something due to cancellation delay which we need to ignore
@@ -814,7 +814,7 @@ private[http] object HttpServerBluePrint {
override def onPull(): Unit = ()
override def onUpstreamFinish(): Unit = ()
override def onUpstreamFailure(ex: Throwable): Unit = ()
- override def onDownstreamFinish(): Unit = ()
+ override def onDownstreamFinish(cause: Throwable): Unit = ()
})
newFlow.runWith(sourceOut.source, sinkIn.sink)(subFusingMaterializer)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
index f0c951374..db3872e67 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala
@@ -92,7 +92,7 @@ private[http] object One2OneBidiFlow {
override def onPull(): Unit =
if (insideWrappedFlow < maxPending || maxPending == -1) pull(in)
else pullSuppressed = true
- override def onDownstreamFinish(): Unit = cancel(in)
+ override def onDownstreamFinish(cause: Throwable): Unit = cancel(in)
})
setHandler(fromWrapped,
@@ -117,7 +117,7 @@ private[http] object One2OneBidiFlow {
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(fromWrapped)
- override def onDownstreamFinish(): Unit = cancel(fromWrapped)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(fromWrapped)
})
}
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
index cf5822c54..013b08e65 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StreamUtils.scala
@@ -202,7 +202,7 @@ private[http] object StreamUtils {
object OneTimeValve {
def apply(): OneTimeValve = new OneTimeValve {
val promise = Promise[Unit]()
- val _source = Source.fromFuture(promise.future).drop(1) // we are only
interested in the completion event
+ val _source = Source.future(promise.future).drop(1) // we are only
interested in the completion event
def source[T]: Source[T, NotUsed] = _source.asInstanceOf[Source[T,
NotUsed]] // safe, because source won't generate any elements
def open(): Unit = promise.success(())
@@ -226,7 +226,7 @@ private[http] object StreamUtils {
var timeout: OptionVal[Cancellable] = OptionVal.None
- override def onDownstreamFinish(): Unit = {
+ override def onDownstreamFinish(cause: Throwable): Unit = {
cancelAfter match {
case finite: FiniteDuration =>
log.debug(s"Delaying cancellation for $finite")
diff --git
a/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
b/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
index 9e6efbbbe..b8faa49c9 100644
---
a/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
+++
b/http-core/src/test/java/org/apache/pekko/http/javadsl/WSEchoTestClientApp.java
@@ -67,7 +67,7 @@ public class WSEchoTestClientApp {
TextMessage.create("abc"),
TextMessage.create("def"),
TextMessage.create("ghi")))
- .concat(Source.fromFuture(delayedCompletion).drop(1));
+ .concat(Source.future(delayedCompletion).drop(1));
Sink<Message, CompletionStage<List<String>>> echoSink =
Flow.of(Message.class)
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
index 4f2342c57..9266b28cd 100644
---
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
+++
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala
@@ -803,7 +803,7 @@ class HostConnectionPoolSpec extends
PekkoSpecWithMaterializer(
super.onUpstreamFailure(ex)
}
- override def onDownstreamFinish(): Unit = failStage(new
RuntimeException("was cancelled"))
+ override def onDownstreamFinish(cause: Throwable): Unit =
failStage(new RuntimeException("was cancelled"))
}
setHandlers(reqIn, reqOut, new MonitorMessage(reqIn, reqOut))
setHandlers(resIn, resOut, new MonitorMessage(resIn, resOut))
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
index d1e3b4112..9ce4f32c4 100644
---
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
+++
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/NewConnectionPoolSpec.scala
@@ -247,7 +247,7 @@ class NewConnectionPoolSpec extends
PekkoSpecWithMaterializer("""
val crashingEntity =
Source.fromIterator(() => Iterator.fill(10)(ByteString("abc")))
- .concat(Source.fromFuture(errorOnConnection1.future))
+ .concat(Source.future(errorOnConnection1.future))
.log("response-entity-stream")
.addAttributes(Attributes.logLevels(Logging.InfoLevel,
Logging.InfoLevel, Logging.InfoLevel))
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
index 96c092764..82db1d8c4 100644
---
a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
+++
b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/RequestParserSpec.scala
@@ -810,7 +810,7 @@ abstract class RequestParserSpec(mode: String, newLine:
String) extends AnyFreeS
}
.concatSubstreams
.flatMapConcat { x =>
- Source.fromFuture {
+ Source.future {
x match {
case Right(request) => compactEntity(request.entity).fast.map(x
=> Right(request.withEntity(x)))
case Left(error) => FastFuture.successful(Left(error))
diff --git
a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
index ceffbd1bc..db20148b0 100755
---
a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
+++
b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/model/HttpEntitySpec.scala
@@ -92,7 +92,7 @@ class HttpEntitySpec extends PekkoSpecWithMaterializer {
"Infinite data stream" in {
val neverCompleted = Promise[ByteString]()
intercept[TimeoutException] {
- Await.result(Default(tpe, 42,
Source.fromFuture(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
+ Await.result(Default(tpe, 42,
Source.future(neverCompleted.future)).toStrict(100.millis), awaitAtMost)
}.getMessage should be(
"HttpEntity.toStrict timed out after 100 milliseconds while still
waiting for outstanding data")
}
diff --git
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
index 166eb1b27..ca3039acf 100755
---
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
+++
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/EntityStreamingSpec.scala
@@ -131,7 +131,7 @@ class EntityStreamingSpec extends RoutingSpec with
ScalaFutures {
// flatten the Future[Source[]] into a Source[]:
val source: Source[Tweet, Future[NotUsed]] =
- Source.fromFutureSource(unmarshalled)
+ Source.futureSource(unmarshalled)
// #json-streaming-client-example
// tests ------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]