Thank you,
Vlad On 4/10/17 08:13, Thomas Weise wrote:
In this example join/writer produces the data, reader/topN consumes. You cannot deallocate producer before all data has been drained. When using files, join/writer can be deallocated when all data was flushed to the files and allocation of consumer can wait until that occurred, if the space isn't available to have both of them active at same time. Overall it seems this is not a matter of activating/deactivating streams but operators. Thomas On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:With additional file readers/writers the pipeline of a single stage becomes the 3 operator use case I described. With ability to open/close ports, platform can optimize it by re-allocating resources from readers to writers. Thank you, Vlad On 4/10/17 07:44, Thomas Weise wrote:In streaming there is a stream (surprise), in a space constraint batch case, we can have additional file writers/readers between the operators. Modules can in fact be used to support pipeline reuse, but they must be added/removed dynamically to support stages with on-demand resource allocation. Thomas On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: Do you suggest that in a streaming use case join operator also pass datato downstream using files or that there are two different join operators one for streaming and one for batch? If not, it means that the join operator needs to emit data to a separate file output operator, so it still needs to read data from a temporary space before emitting, why not to emit directly to topN in this case? Is not pipeline reuse already supported by Apex modules? Thank you, Vlad On 4/10/17 06:59, Thomas Weise wrote: I don't think this fully covers the the scenario of limited resources.You describe a case of 3 operators, but when you consider just 2 operators that both have to hold a large data set in memory, then the suggested approach won't work. Let's say the first operator is outer join and the second operator topN. Both are blocking and cannot emit before all input is seen. To deallocate the outer join, all results need to be drained. It's a resource swap and you need a temporary space to hold the data. Also, if the requirement is to be able to recover and retry from results of stage one, then you need a fault tolerant swap space. If the cluster does not have enough memory, then disk is a good option (SLA vs. memory tradeoff). I would also suggest to think beyond the single DAG scenario. Often users need to define pipelines that are composed of multiple smaller flows (which they may also want to reuse in multiple pipelines). APEXCORE-408 gives you an option to compose such flows within a single Apex application, in addition of covering the simplified use case that we discuss there. Thomas On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote: It is exactly the same use case with the exception that it is notnecessary to write data to files. Consider 3 operators, an input operator, an aggregate operator and an output operator. When the application starts, the output port of the aggregate operator should be in the closed state, the stream between the second and the third would be inactive and the output operator does not need to be allocated. After the input operator process all data, it can close the output port and the input operator may be de-allocated. Once the aggregator receives EOS on it's input port, it should open the output port and start writing to it. At this point, the output operator needs to be deployed and the stream between the last two operators (aggregator and output) becomes active. In a real batch use case, it is preferable to have full application DAG to be statically defined and delegate to platform activation/de-activation of stages. It is also preferable not to write intermediate files to disk/HDFS, but instead pass data in-memory. Thank you, Vlad On 4/6/17 09:37, Thomas Weise wrote: You would need to provide more specifics of the use case you are thinkingto address to make this a meaningful discussion. An example for APEXCORE-408 (based on real batch use case): I have two stages, first stage produces a set of files that second stage needs as input. Stage 1 operators to be released and stage 2 operators deployed when stage 2 starts. These can be independent operators, they don't need to be connected through a stream. Thomas On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: It is not about a use case difference. My proposal and APEXCORE-408 address the same use case - how to re-allocate resources for batchapplications or applications where processing happens in stages. The difference between APEXCORE-408 and the proposal is shift in complexity from application logic to the platform. IMO, supporting batch applications using APEXCORE-408 will require more coding on the application side. Thank you, Vlad On 4/5/17 21:57, Thomas Weise wrote: I think this needs more input on a use case level. The ability to dynamically alter the DAG internally will also address the resourceallocation for operators: https://issues.apache.org/jira/browse/APEXCORE-408 It can be used to implement stages of a batch pipeline and is very flexible in general. Considering the likely implementation complexity for the proposed feature I would like to understand what benefits it provides to the user (use cases that cannot be addressed otherwise)? Thanks, Thomas On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov < v.ro...@datatorrent.com> wrote: Correct, a statefull downstream operator can only be undeployed at a checkpoint window after it consumes all data emitted by upstreamoperator on the closed port. It will be necessary to distinguish between closed port and inactive stream. After port is closed, stream may still be active and after port is open, stream may still be inactive (not yet ready). The more contributors participate in the discussion and implementation, the more solid the feature will be. Thank you, Vlad Отправлено с iPhone On Apr 1, 2017, at 11:03, Pramod Immaneni <pra...@datatorrent.com> wrote: Generally a good idea. Care should be taken around fault tolerance and idempotency. Close stream would need to stop accepting new data butstill can't actually close all the streams and un-deploy operators till committed. Idempotency might require the close stream to take effect at the end of the window. What would it then mean for re-opening streamswithina window? Also, looks like a larger undertaking, as Ram suggestedwould begood to understand the use cases and I also suggest that multiple folks participate in the implementation effort to ensure that we are able to address all the scenarios and minimize chances of regression in existing behavior. Thanks On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov < v.ro...@datatorrent.com wrote: All, Currently Apex assumes that an operator can emit on any definedoutput port and all streams defined by a DAG are active. I'd like to propose an ability for an operator to open and close output ports. By default all ports defined by an operator will be open. In the case an operator for any reason decides that it will not emit tuples on the output port,it may close it. This will make the stream inactive and the applicationmaster may undeploy the downstream (for that input stream) operators. Ifthis leads to containers that don't have any active operators, those containers may be undeployed as well leading to better cluster resource utilizationand better Apex elasticity. Later, the operator may be in a state where it needs to emit tuples on the closed port. In this case, it needs to re-open the port and wait till the stream becomes active again beforeemitting tuples on that port. Making inactive stream active again, requiresthe application master to re-allocate containers and re-deploy the downstream operators.It should be also possible for an application designer to markstreams as inactive when an application starts. This will allow theapplication master avoid reserving all containers when the application starts. Later, the port can be open and inactive stream become active. Thank you,Vlad