On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <t...@apache.org> wrote:

> It is exciting to see this move forward, the ability to use Python opens
> many new possibilities.
>
> Regarding use of worker threads, this is a pattern that we are using
> elsewhere (for example in the Kafka input operator). When the operator
> performs blocking operations and consumes little memory and/or CPU, then it
> is more economic to first use threads to increase parallelism and
> throughput (up to a limit), and then the more expensive containers for
> horizontal scaling (multiple threads to make good use of container
> resources and then scale using the usual partitioning).
>

I think there is a difference. In case of kafka or other input operators
the threads are less constrained. They can operate with independence and
can dictate the pace limited only by back pressure. In this case the
operator is most likely going to be downsteram in the DAG and would have
constraints for processing guarantees. For scalability, container local
could also be used as a substitue for multiple threads without resorting to
using separate containers. I can understand use of a separate thread to be
able to get around problems like stalled processing but would first try to
see if something like container local would work for scaling.


> It is also correct that generally there is no ordering guarantee within a
> streaming window, and that would be the case when multiple input ports are
> present as well. (The platform cannot guarantee such ordering, this would
> need to be done by the operator).



> Idempotency can be expensive (latency and/or space complexity), and not all
> applications need it (like certain use cases that process record by record
> and don't accumulate state). An example might be Python logic that is used
> for scoring against a model that was built offline. Idempotency would
> actually be rather difficult to implement, since the operator would need to
> remember which tuples were emitted in a given interval and on replay block
> until they are available (and also hold others that may be processed sooner
> than in the original order). It may be easier to record emitted tuples to a
> WAL instead of reprocessing.
>

Ordering cannot be guaranteed but the operator would need to finish the
work it is given a window within the window boundary, otherwise there is a
chance for data loss in recovery scenarios. You could make checkpoint the
boundary by which all pending work is completed instead of every window
boundary but then downstream operators cannot rely on window level
idempotency for exactly once. Something like file output operator would
work but not our db kind of operator. Both options could be supported in
the operator.


> Regarding not emitting stragglers until the next input arrives, can this
> not be accomplished using IdleTimeHandler?
>
> What is preventing the use of virtual environments?
>
> Thanks,
> Thomas
>
>
> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pra...@datatorrent.com>
> wrote:
>
> > Hi Ananth,
> >
> > From your explanation, it looks like the threads overall allow you to
> > achieve two things. Have some sort of overall timeout if by which a tuple
> > doesn't finish processing then it is flagged as such. Second, it doesn't
> > block processing of subsequent tuples and you can still process them
> > meeting the SLA. By checkpoint, however, I think you should try to have a
> > resolution one way or the other for all the tuples received within the
> > checkpoint period or every window boundary (see idempotency below),
> > otherwise, there is a chance of data loss in case of operator restarts.
> If
> > a loss is acceptable for stragglers you could let straggler processing
> > continue beyond checkpoint boundary and let them finish when they can.
> You
> > could support both behaviors by use of a property. Furthermore, you may
> not
> > want all threads stuck with stragglers and then you are back to square
> one
> > so you may need to stop processing stragglers beyond a certain thread
> usage
> > threshold. Is there a way to interrupt the processing of the engine?
> >
> > Then there is the question of idempotency. I suspect it would be
> difficult
> > to maintain it unless you wait for processing to finish for all tuples
> > received during the window every window boundary. You may provide an
> option
> > for relaxing the strict guarantees for the stragglers like mentioned
> above.
> >
> > Pramod
> >
> > On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.a...@gmail.com>
> wrote:
> >
> > > Hello Pramod,
> > >
> > > Thanks for the comments. I adjusted the title of the JIRA. Here is
> what I
> > > was thinking for the worker pool implementation.
> > >
> > > - The main reason ( which I forgot to mention in the design points
> below
> > )
> > > is that the java embedded engine allows only the thread that created
> the
> > > instance to execute the python logic. This is more because of the JNI
> > > specification itself. Some hints here https://stackoverflow.com/
> > > questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> > > https://stackoverflow.com/questions/18056347/jni-
> > calling-java-from-c-with-
> > > multiple-threads> and here http://journals.ecs.soton.ac.
> > > uk/java/tutorial/native1.1/implementing/sync.html <
> > > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> > > implementing/sync.html>
> > >
> > > - This essentially means that the main operator thread will have to
> call
> > > the python code execution logic if the design were otherwise.
> > >
> > > - Since the end user can choose to can write any kind of logic
> including
> > > blocking I/O as part of the implementation, I did not want to stall the
> > > operator thread for any usage pattern.
> > >
> > > - In fact there is only one overall interpreter in the JVM process
> space
> > > and the interpreter thread is just a JNI wrapper around it to account
> for
> > > the JNI limitations above.
> > >
> > > - It is for the very same reason, there is an API in the implementation
> > to
> > > support for registering Shared Modules across all of the interpreter
> > > threads. Use cases for this exist when there is a global variable
> > provided
> > > by the underlying Python library and loading it multiple times can
> cause
> > > issues. Hence the API to register a shared module which can be used by
> > all
> > > of the Interpreter Threads.
> > >
> > > - The operator submits to a work request queue and consumes from a
> > > response queue for each of the interpreter thread. There exists one
> > request
> > > and one response queue per interpreter thread.
> > >
> > > - The stragglers will get drained from the response queue for a
> > previously
> > > submitted request queue.
> > >
> > > - The other reason why I chose to implement it this ways is also for
> some
> > > of the use case that I foresee in the ML scoring scenarios. In fraud
> > > systems, if I have a strict SLA to score a model, the main thread in
> the
> > > operator is not helping me implement this pattern at all. The caller to
> > the
> > > Apex application will need to proceed if the scoring gets delayed due
> to
> > > whatever reason. However the scoring can continue on the interpreter
> > thread
> > > and can be drained later ( It is just that the caller did not make use
> of
> > > this result but can still be persisted for operators consuming from the
> > > straggler port.
> > >
> > > - There are 3 output ports for this operator. DefaultOutputPort,
> > > stragglersPort and an errorPort.
> > >
> > > - Some libraries like Tensorflow can become really heavy. Tensorflow
> > > models can execute a tensorflow DAG as part of a model scoring
> > > implementation and hence I wanted to take the approach of a worker
> pool.
> > > Yes your point is valid if we wait for the stragglers to complete in a
> > > given window. The current implementation does not force to wait for all
> > of
> > > the stragglers to complete. The stragglers are emitted only when there
> > is a
> > > new tuple that is being processed. i.e. when a new tuple arrives for
> > > scoring , the straggler response queue is checked if there are any
> > entries
> > > and if yes, the responses are emitted into the stragglerPort. This
> > > essentially means that there are situations when the straggler port is
> > > emitting the result for a request submitted in the previous window.
> This
> > > also implies that idempotency cannot be guaranteed across runs of the
> > same
> > > input data. In fact all threaded implementations have this issue as
> > > ordering of the results is not guaranteed to be unique even within a
> > given
> > > window ?
> > >
> > > I can enforce a block/drain at the end of the window to force a
> > completion
> > > basing on the feedback.
> > >
> > >
> > > Regards,
> > > Ananth
> > >
> > > > On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com>
> > > wrote:
> > > >
> > > > Hi Anath,
> > > >
> > > > Sounds interesting and looks like you have put quite a bit of work on
> > it.
> > > > Might I suggest changing the title of 2260 to better fit your
> proposal
> > > and
> > > > implementation, mainly so that there is differentiation from 2261.
> > > >
> > > > I wanted to discuss the proposal to use multiple threads in an
> operator
> > > > instance. Unless the execution threads are blocking for some sort of
> > i/o
> > > > why would it result in a noticeable performance difference compared
> to
> > > > processing in operator thread and running multiple partitions of the
> > > > operator in container local. By running the processing in a separate
> > > thread
> > > > from the operator lifecycle thread you don't still get away from
> > matching
> > > > the incoming data throughput. The checkpoint will act as a time where
> > you
> > > > backpressure will start to materialize when the operator would have
> to
> > > wait
> > > > for your background processing to complete to guarantee all data till
> > the
> > > > checkpoint is processed.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello All,
> > > >>
> > > >> I would like to submit the design for the Python execution operator
> > > before
> > > >> I raise the pull request so that I can refine the implementation
> based
> > > on
> > > >> feedback. Could you please provide feedback on the design if any
> and I
> > > will
> > > >> raise the PR accordingly.
> > > >>
> > > >> - This operator is for the JIRA ticket raised here
> > > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> > > >> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> > > >> - The operator embeds a python interpreter in the operator JVM
> process
> > > >> space and is not external to the JVM.
> > > >> - The implementation is proposing the use of Java Embedded Python (
> > JEP
> > > )
> > > >> given here https://github.com/ninia/jep <
> https://github.com/ninia/jep
> > >
> > > >> - The JEP engine is under zlib/libpng license. Since this is an
> > approved
> > > >> license under https://www.apache.org/legal/resolved.html#category-a
> <
> > > >> https://www.apache.org/legal/resolved.html#category-a> I am
> assuming
> > it
> > > >> is ok for the community to approve the inclusion of this library
> > > >> - Python integration is a messy piece due to the nature of dynamic
> > > >> libraries. All python libraries need to be natively installed. This
> > also
> > > >> means we will not be able bundle python libraries and dependencies
> as
> > > part
> > > >> of the build into the target JVM container. Hence this operator has
> > the
> > > >> current limitation of the python binaries installed through an
> > external
> > > >> process on all of the YARN nodes for now.
> > > >> - The JEP maven dependency jar in the POM is a JNI wrapper around
> the
> > > >> dynamic library that is installed externally to the Apex
> installation
> > > >> process on all of the YARN nodes.
> > > >> - Hope to take up https://issues.apache.org/
> jira/browse/APEXCORE-796
> > <
> > > >> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
> > issue
> > > >> in the future.
> > > >> - The python operator implementation can be extended to py4J based
> > > >> implementation ( as opposed to in-memory model like JEP ) in the
> > future
> > > if
> > > >> required be. JEP is the implementation based on an in-memory design
> > > pattern.
> > > >> - The python operator allows for 4 major API patterns
> > > >>    - Execute a method call by accepting parameters to pass to the
> > > >> interpreter
> > > >>    - Execute a python script as given in a file path
> > > >>    - Evaluate an expression and allows for passing of variables
> > between
> > > >> the java code and the python in-memory interpreter bridge
> > > >>    - A handy method wherein a series of instructions can be passed
> in
> > > one
> > > >> single java call ( executed as a sequence of python eval
> instructions
> > > under
> > > >> the hood )
> > > >> - Automatic garbage collection of the variables that are passed from
> > > java
> > > >> code to the in memory python interpreter
> > > >> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> > > >> xgboost. Preliminary tests for these libraries seem to work as per
> > code
> > > >> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> > > >> apex/apexjvmpython <https://github.com/ananthc/
> > > >> sampleapps/tree/master/apache-apex/apexjvmpython>
> > > >> - The implementation allows for SLA based execution model. i.e. the
> > > >> operator is given a chance to execute the python code and if not
> > > complete
> > > >> within a time out, the operator code returns back null.
> > > >> - A tuple that has become a straggler as per previous point will
> > > >> automatically be drained off to a different port so that downstream
> > > >> operators can still consume the straggler if they want to when the
> > > results
> > > >> arrive.
> > > >> - Because of the nature of python being an interpreter and if a
> > previous
> > > >> tuple is being still processed, there is chance of a back pressure
> > > pattern
> > > >> building up very quickly. Hence this operator works on the concept
> of
> > a
> > > >> worker pool. The Python operator uses a configurable number of
> worker
> > > >> thread each of which embed the Python interpreter within their
> > > processing
> > > >> space. i.e. it is in fact a collection of python ink memory
> > interpreters
> > > >> inside the Python operator implementation.
> > > >> - The operator chooses one of the threads at runtime basing on their
> > > busy
> > > >> state thus allowing for back-pressure issues to be resolved
> > > automatically.
> > > >> - There is a first class support for Numpy in JEP. Java arrays would
> > be
> > > >> convertible to the Python Numpy arrays and vice versa and share the
> > same
> > > >> memory addresses for efficiency reasons.
> > > >> - The base operator implements dynamic partitioning based on a
> thread
> > > >> starvation policy. At each checkpoint, it checks how much percentage
> > of
> > > the
> > > >> requests resulted in starved threads and if the starvation exceeds a
> > > >> configured percentage, a new instance of the operator is provisioned
> > for
> > > >> every such instance of the operator
> > > >> - The operator provides the notion of a worker execution mode. There
> > are
> > > >> two worker modes that are passed in each of the above calls from the
> > > user.
> > > >> ALL or ANY.  Because python interpreter is state based engine, a
> newly
> > > >> dynamically partitioned operator might not be in the exact state of
> > the
> > > >> remaining operators. Hence the operator has this notion of worker
> > > execution
> > > >> mode. Any call ( any of the 4 calls mentioned above ) called with
> ALL
> > > >> execution mode will be executed on all the workers of the worker
> > thread
> > > >> pool as well as the dynamically portioned instance whenever such an
> > > >> instance is provisioned.
> > > >> - The base operator implementation has a method that can be
> overridden
> > > to
> > > >> implement the logic that needs to be executed for each tuple. The
> base
> > > >> operator default implementation is a simple NO-OP.
> > > >> - The operator automatically picks up the least busy of the thread
> > pool
> > > >> worker which has JEP embedded in it to execute the call.
> > > >> - The JEP based installation will not support non Cpython modules.
> All
> > > of
> > > >> the major python libraries are cpython based and hence I believe
> this
> > > is of
> > > >> a lesser concern. If we hit a roadblock when a new python library
> > being
> > > a
> > > >> non-Cpython based library needs to be run, then we could implement
> the
> > > >> ApexPythonEngine interface to something like Py4J which involves
> > > >> interprocess communication.
> > > >> - The python operator requires the user to set the library path
> > > >> java.library.path for the operator to make use of the dynamic
> > libraries
> > > of
> > > >> the corresponding platform. This has to be passed in as the JVM
> > options.
> > > >> Failing to do so will result in the operator failing to load the
> > > >> interpreter properly.
> > > >> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
> > Numpy
> > > >=
> > > >> 1.7 is supported.
> > > >> - There is no support for virtual environments yet. In case of
> > multiple
> > > >> python versions on the node, to include the right python version for
> > the
> > > >> apex operator, ensure that the environment variables and the dynamic
> > > >> library path are set appropriately. This is a workaround and I hope
> > > >> APEXCORE-796 will solve this issue as well.
> > > >>
> > > >>
> > > >> Regards,
> > > >> Ananth
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to