That's a great idea (I think we need a work a bit on the name though). Make it fusable and ideally not even have to touch the elements...
On Fri, Sep 25, 2015 at 2:30 PM, Roland Kuhn <[email protected]> wrote: > The real question is whether the Sink itself should care—which it does not > need to as demonstrated by the current implementation. If user code cares > about stream completion mode then there are many sinks that do not provide > this information (because it is not their core business, it is a generic > concern). Perhaps we should offer a combinator whose only purpose is to > complete a Future upon termination, but otherwise it just sends all > elements downstream, like > > def completionProbe[M](combine: [Mat, Future[Unit]] => M): Repr[Out, M] > > Then this problem could be solved independent of the chosen Source: > > val (completion, sinkFuture) = > source.completionProbe(Keep.right).toMat(sink)(Keep.both).run() > > For this combinator (and possibly others of its kind) I would not want to > provide a default scheme for treating the materialized value since adding > such a value is its only purpose. > > Regards, > > Roland > > > 25 sep 2015 kl. 13:45 skrev Viktor Klang <[email protected]>: > > Imagine the following scenario: > > def x[T, U](providedByCaller: Source[T, U]) = > providedByCaller.toMat(SynchronousFileSink(new > File(“/some/other/path")))(Keep.both) // <-- how do I know if it failed if > all I have is U? > > > > On Fri, Sep 25, 2015 at 1:01 PM, Roland Kuhn <[email protected]> wrote: > >> The current behavior is the only consistent behavior I can see for the >> given type signatures; do you have a better proposal? Should a Sink.file >> report the reason for why it stopped writing when that reason did not lie >> within itself? (I’m leaning towards “no”, but even if the answer is “yes” >> then we cannot fail the Future, we still need to report the number of bytes >> that were written prior to the failure) >> >> Regards, >> >> Roland >> >> >> 25 sep 2015 kl. 12:50 skrev Viktor Klang <[email protected]>: >> >> Question is whether this will be a common source of issues or not. >> >> -- >> Cheers, >> √ >> On 25 Sep 2015 08:24, "Roland Kuhn" <[email protected]> wrote: >> >>> Hi Drew, >>> >>> the Future you are looking at is from the Sink, which has not failed: it >>> has successfully written zero bytes to the output file. The Source on the >>> other hand has failed and should give you the right exception when you look >>> at it: >>> >>> val (sourceFuture, sinkFuture) = source.toMat(sink)(Keep.both).run() >>> >>> Upon successful completion of the stream both futures should have the >>> same Long number. >>> >>> Regards, >>> >>> Roland >>> >>> 25 sep 2015 kl. 03:59 skrev Drew Kutcharian <[email protected]>: >>> >>> Hi Guys, >>> >>> In the following simple file copy code: >>> >>> object TestStreams extends App { >>> >>> implicit val system = ActorSystem("akka-stream") >>> implicit val materializer = ActorMaterializer() >>> >>> val source = SynchronousFileSource(new >>> File(“/path/to/non/existing/file")) >>> >>> val sink = SynchronousFileSink(new File(“/some/other/path")) >>> >>> val futureResult = source.runWith(sink) >>> >>> val result = Await.result(f, Duration.Inf) >>> println(result) >>> >>> system.shutdown() >>> } >>> >>> Why is the “futureResult” equal to Success(0) while the stream has >>> certainly failed and the following exception has been logged? >>> >>> [ERROR] [09/24/2015 18:54:49.634] >>> [akka-stream-akka.stream.default-file-io-dispatcher-7] [ >>> akka://akka-stream/user/$a/flow-1-1-synchronousFileSink] Tearing >>> down SynchronousFileSink(/some/other/path) due to upstream error >>> java.io.FileNotFoundException: /path/to/non/existing/file (No such file >>> or directory) >>> at java.io.RandomAccessFile.open0(Native Method) >>> at java.io.RandomAccessFile.open(RandomAccessFile.java:316) >>> at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243) >>> at >>> akka.stream.impl.io.SynchronousFilePublisher.preStart(SynchronousFilePublisher.scala:49) >>> at akka.actor.Actor$class.aroundPreStart(Actor.scala:472) >>> >>> at >>> akka.stream.impl.io.SynchronousFilePublisher.akka$stream$actor$ActorPublisher$$super$aroundPreStart(SynchronousFilePublisher.scala:33) >>> at >>> akka.stream.actor.ActorPublisher$class.aroundPreStart(ActorPublisher.scala:322) >>> >>> at >>> akka.stream.impl.io.SynchronousFilePublisher.aroundPreStart(SynchronousFilePublisher.scala:33) >>> at akka.actor.ActorCell.create(ActorCell.scala:580) >>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) >>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> Thanks, >>> >>> Drew >>> >>> -- >>> >>> Read the docs: http://akka.io/docs/ >>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> Search the archives: https://groups.google.com/group/akka-user >>> >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >>> >>> >>> >>> Dr. Roland Kuhn >>> Akka Tech Lead >>> Typesafe – Reactive apps on the JVM. >>> twitter: @rolandkuhn >>> >>> >>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To post to this group, send email to [email protected]. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> >> >> >> >> *Dr. Roland Kuhn* >> *Akka Tech Lead* >> Typesafe <http://typesafe.com/> – Reactive apps on the JVM. >> twitter: @rolandkuhn >> <http://twitter.com/#!/rolandkuhn> >> >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Cheers, > √ > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > > > *Dr. Roland Kuhn* > *Akka Tech Lead* > Typesafe <http://typesafe.com/> – Reactive apps on the JVM. > twitter: @rolandkuhn > <http://twitter.com/#!/rolandkuhn> > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Cheers, √ -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
