Serialization and resource manager are not involved in CONTAINER_LOCAL.

However, CONTAINER_LOCAL is probably not what you want, since it would
force all partitions to run in the same JVM.

Thomas


On Fri, Dec 22, 2017 at 1:19 PM, Ananth G <ananthg.a...@gmail.com> wrote:

> I guess my comment below regarding overhead of serialisation in container
> local is wrong ? Nevertheless having a local thread implementation gives
> some benefits . For example I am using to whether sleep if there is no
> request in the queue or spin checking for request presence in the request
> queue etc to take care of no delays in the request queue processing itself.
>
> Regards,
> Ananth
>
> > On 23 Dec 2017, at 6:50 am, Ananth G <ananthg.a...@gmail.com> wrote:
> >
> >
> > Thanks for the comments Thomas and Pramod.
> >
> > Apologies for the delayed response on this thread.
> >
> > > I believe the thread implementation still adds some value over a
> container local approach. It is more of a “thread local” equivalent which
> is more efficient as opposed to a container local implementation. Also the
> number of worker threads is configurable. Setting the value of 1 will let
> the user to not do this ( although I do not see a reason for why not ).
> There is always the over head of serialise/de-serialize cycle even for a
> container local approach and there is the additional possibility of
> container local not being honoured by the Resource manager based on the
> state of the resources.
> >
> > > Regarding the configurable key to ensure all tuples in a window are
> processed, I am adding a switch which can let the user choose ( and javadoc
> that clearly points out issues if not waiting for the tuples to be
> completely processed ). There are pros and cons for this and letting the
> user decide might be a better approach. The reason why I mention cons for
> waiting the tuples to complete ( apart from the reason that Thomas
> mentioned ) is that if one of the commands that the user wrote is an
> erroneous one, all the subsequent calls to that interpreter thread cal
> fail. An example use case is that tuple A set some value for variable x and
> tuple B that is coming next is making use of the variable x. Syntactically
> expression for tuple B is valid but just that it depends on variable x. Now
> if the variable x is not in memory because tuple A is a straggler resulting
> in tuple B resulting in an erroneous interpreter state. Hence the operator
> might stall definitely as end window will be stalled forever resulting in
> killing of the operator ultimately. This is also because the erroneous
> command corrupted the state of the interpreter itself. Of course this can
> happen to all of the threads in the interpreter worker pool resulting in
> this state as well. Perhaps an improvement of the current implementation is
> to detect all such stalled interpreters for more than x windows and rebuild
> the interpreter thread when such a situation is detected.
> >
> > > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that
> the stragglers are drained out irrespective of a new tuple coming in for
> processing. In the previous iteration, the stragglers could only be drained
> when there is a new tuple that came in processing as delayed responses
> queue could only be checked when there is some activity on the main thread.
> >
> > > Thanks for raising the point about the virtual environments: This is a
> point I missed mentioning in the design description below. There is no
> support for virtual environments yet in JEP and hence the current
> limitation. However the work around is simple. As part of the application
> configuration, we need to provide the JAVA_LIBRARY_PATH which contains the
> path to the JEP dynamic libraries. If there are multiple python installs (
> and hence multiple JEP libraries to choose from for each of the apex
> applications that are being deployed), setting the right path for the
> operator JVM will result in picking the corresponding python interpreter
> version. This also essentially means that we cannot have a thread local
> deployment configuration of two python operators that belong to different
> python versions in the same JVM.  The Docker approach ticket should be the
> right fix for virtual environments issue? https://issues.apache.org/
> jira/browse/APEXCORE-796 <https://issues.apache.org/
> jira/browse/APEXCORE-796> ( but still might not solve the thread local
> configuration deployment )
> >
> > Regards,
> > Ananth
> >
> >
> >> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pra...@datatorrent.com
> <mailto:pra...@datatorrent.com>> wrote:
> >>
> >> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <t...@apache.org <mailto:
> t...@apache.org>> wrote:
> >>
> >>> It is exciting to see this move forward, the ability to use Python
> opens
> >>> many new possibilities.
> >>>
> >>> Regarding use of worker threads, this is a pattern that we are using
> >>> elsewhere (for example in the Kafka input operator). When the operator
> >>> performs blocking operations and consumes little memory and/or CPU,
> then it
> >>> is more economic to first use threads to increase parallelism and
> >>> throughput (up to a limit), and then the more expensive containers for
> >>> horizontal scaling (multiple threads to make good use of container
> >>> resources and then scale using the usual partitioning).
> >>>
> >>
> >> I think there is a difference. In case of kafka or other input operators
> >> the threads are less constrained. They can operate with independence and
> >> can dictate the pace limited only by back pressure. In this case the
> >> operator is most likely going to be downsteram in the DAG and would have
> >> constraints for processing guarantees. For scalability, container local
> >> could also be used as a substitue for multiple threads without
> resorting to
> >> using separate containers. I can understand use of a separate thread to
> be
> >> able to get around problems like stalled processing but would first try
> to
> >> see if something like container local would work for scaling.
> >>
> >>
> >>> It is also correct that generally there is no ordering guarantee
> within a
> >>> streaming window, and that would be the case when multiple input ports
> are
> >>> present as well. (The platform cannot guarantee such ordering, this
> would
> >>> need to be done by the operator).
> >>
> >>
> >>
> >>> Idempotency can be expensive (latency and/or space complexity), and
> not all
> >>> applications need it (like certain use cases that process record by
> record
> >>> and don't accumulate state). An example might be Python logic that is
> used
> >>> for scoring against a model that was built offline. Idempotency would
> >>> actually be rather difficult to implement, since the operator would
> need to
> >>> remember which tuples were emitted in a given interval and on replay
> block
> >>> until they are available (and also hold others that may be processed
> sooner
> >>> than in the original order). It may be easier to record emitted tuples
> to a
> >>> WAL instead of reprocessing.
> >>>
> >>
> >> Ordering cannot be guaranteed but the operator would need to finish the
> >> work it is given a window within the window boundary, otherwise there
> is a
> >> chance for data loss in recovery scenarios. You could make checkpoint
> the
> >> boundary by which all pending work is completed instead of every window
> >> boundary but then downstream operators cannot rely on window level
> >> idempotency for exactly once. Something like file output operator would
> >> work but not our db kind of operator. Both options could be supported in
> >> the operator.
> >>
> >>
> >>> Regarding not emitting stragglers until the next input arrives, can
> this
> >>> not be accomplished using IdleTimeHandler?
> >>>
> >>> What is preventing the use of virtual environments?
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>>
> >>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <
> pra...@datatorrent.com <mailto:pra...@datatorrent.com>>
> >>> wrote:
> >>>
> >>>> Hi Ananth,
> >>>>
> >>>> From your explanation, it looks like the threads overall allow you to
> >>>> achieve two things. Have some sort of overall timeout if by which a
> tuple
> >>>> doesn't finish processing then it is flagged as such. Second, it
> doesn't
> >>>> block processing of subsequent tuples and you can still process them
> >>>> meeting the SLA. By checkpoint, however, I think you should try to
> have a
> >>>> resolution one way or the other for all the tuples received within the
> >>>> checkpoint period or every window boundary (see idempotency below),
> >>>> otherwise, there is a chance of data loss in case of operator
> restarts.
> >>> If
> >>>> a loss is acceptable for stragglers you could let straggler processing
> >>>> continue beyond checkpoint boundary and let them finish when they can.
> >>> You
> >>>> could support both behaviors by use of a property. Furthermore, you
> may
> >>> not
> >>>> want all threads stuck with stragglers and then you are back to square
> >>> one
> >>>> so you may need to stop processing stragglers beyond a certain thread
> >>> usage
> >>>> threshold. Is there a way to interrupt the processing of the engine?
> >>>>
> >>>> Then there is the question of idempotency. I suspect it would be
> >>> difficult
> >>>> to maintain it unless you wait for processing to finish for all tuples
> >>>> received during the window every window boundary. You may provide an
> >>> option
> >>>> for relaxing the strict guarantees for the stragglers like mentioned
> >>> above.
> >>>>
> >>>> Pramod
> >>>>
> >>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.a...@gmail.com
> <mailto:ananthg.a...@gmail.com>>
> >>> wrote:
> >>>>
> >>>>> Hello Pramod,
> >>>>>
> >>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is
> >>> what I
> >>>>> was thinking for the worker pool implementation.
> >>>>>
> >>>>> - The main reason ( which I forgot to mention in the design points
> >>> below
> >>>> )
> >>>>> is that the java embedded engine allows only the thread that created
> >>> the
> >>>>> instance to execute the python logic. This is more because of the JNI
> >>>>> specification itself. Some hints here https://stackoverflow.com/ <
> https://stackoverflow.com/>
> >>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> >>>>> https://stackoverflow.com/questions/18056347/jni- <
> https://stackoverflow.com/questions/18056347/jni->
> >>>> calling-java-from-c-with-
> >>>>> multiple-threads> and here http://journals.ecs.soton.ac <
> http://journals.ecs.soton.ac/>.
> >>>>> uk/java/tutorial/native1.1/implementing/sync.html <
> >>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/ <
> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/>
> >>>>> implementing/sync.html>
> >>>>>
> >>>>> - This essentially means that the main operator thread will have to
> >>> call
> >>>>> the python code execution logic if the design were otherwise.
> >>>>>
> >>>>> - Since the end user can choose to can write any kind of logic
> >>> including
> >>>>> blocking I/O as part of the implementation, I did not want to stall
> the
> >>>>> operator thread for any usage pattern.
> >>>>>
> >>>>> - In fact there is only one overall interpreter in the JVM process
> >>> space
> >>>>> and the interpreter thread is just a JNI wrapper around it to account
> >>> for
> >>>>> the JNI limitations above.
> >>>>>
> >>>>> - It is for the very same reason, there is an API in the
> implementation
> >>>> to
> >>>>> support for registering Shared Modules across all of the interpreter
> >>>>> threads. Use cases for this exist when there is a global variable
> >>>> provided
> >>>>> by the underlying Python library and loading it multiple times can
> >>> cause
> >>>>> issues. Hence the API to register a shared module which can be used
> by
> >>>> all
> >>>>> of the Interpreter Threads.
> >>>>>
> >>>>> - The operator submits to a work request queue and consumes from a
> >>>>> response queue for each of the interpreter thread. There exists one
> >>>> request
> >>>>> and one response queue per interpreter thread.
> >>>>>
> >>>>> - The stragglers will get drained from the response queue for a
> >>>> previously
> >>>>> submitted request queue.
> >>>>>
> >>>>> - The other reason why I chose to implement it this ways is also for
> >>> some
> >>>>> of the use case that I foresee in the ML scoring scenarios. In fraud
> >>>>> systems, if I have a strict SLA to score a model, the main thread in
> >>> the
> >>>>> operator is not helping me implement this pattern at all. The caller
> to
> >>>> the
> >>>>> Apex application will need to proceed if the scoring gets delayed due
> >>> to
> >>>>> whatever reason. However the scoring can continue on the interpreter
> >>>> thread
> >>>>> and can be drained later ( It is just that the caller did not make
> use
> >>> of
> >>>>> this result but can still be persisted for operators consuming from
> the
> >>>>> straggler port.
> >>>>>
> >>>>> - There are 3 output ports for this operator. DefaultOutputPort,
> >>>>> stragglersPort and an errorPort.
> >>>>>
> >>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow
> >>>>> models can execute a tensorflow DAG as part of a model scoring
> >>>>> implementation and hence I wanted to take the approach of a worker
> >>> pool.
> >>>>> Yes your point is valid if we wait for the stragglers to complete in
> a
> >>>>> given window. The current implementation does not force to wait for
> all
> >>>> of
> >>>>> the stragglers to complete. The stragglers are emitted only when
> there
> >>>> is a
> >>>>> new tuple that is being processed. i.e. when a new tuple arrives for
> >>>>> scoring , the straggler response queue is checked if there are any
> >>>> entries
> >>>>> and if yes, the responses are emitted into the stragglerPort. This
> >>>>> essentially means that there are situations when the straggler port
> is
> >>>>> emitting the result for a request submitted in the previous window.
> >>> This
> >>>>> also implies that idempotency cannot be guaranteed across runs of the
> >>>> same
> >>>>> input data. In fact all threaded implementations have this issue as
> >>>>> ordering of the results is not guaranteed to be unique even within a
> >>>> given
> >>>>> window ?
> >>>>>
> >>>>> I can enforce a block/drain at the end of the window to force a
> >>>> completion
> >>>>> basing on the feedback.
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>> Ananth
> >>>>>
> >>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com
> <mailto:pra...@datatorrent.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Anath,
> >>>>>>
> >>>>>> Sounds interesting and looks like you have put quite a bit of work
> on
> >>>> it.
> >>>>>> Might I suggest changing the title of 2260 to better fit your
> >>> proposal
> >>>>> and
> >>>>>> implementation, mainly so that there is differentiation from 2261.
> >>>>>>
> >>>>>> I wanted to discuss the proposal to use multiple threads in an
> >>> operator
> >>>>>> instance. Unless the execution threads are blocking for some sort of
> >>>> i/o
> >>>>>> why would it result in a noticeable performance difference compared
> >>> to
> >>>>>> processing in operator thread and running multiple partitions of the
> >>>>>> operator in container local. By running the processing in a separate
> >>>>> thread
> >>>>>> from the operator lifecycle thread you don't still get away from
> >>>> matching
> >>>>>> the incoming data throughput. The checkpoint will act as a time
> where
> >>>> you
> >>>>>> backpressure will start to materialize when the operator would have
> >>> to
> >>>>> wait
> >>>>>> for your background processing to complete to guarantee all data
> till
> >>>> the
> >>>>>> checkpoint is processed.
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com
> <mailto:ananthg.a...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello All,
> >>>>>>>
> >>>>>>> I would like to submit the design for the Python execution operator
> >>>>> before
> >>>>>>> I raise the pull request so that I can refine the implementation
> >>> based
> >>>>> on
> >>>>>>> feedback. Could you please provide feedback on the design if any
> >>> and I
> >>>>> will
> >>>>>>> raise the PR accordingly.
> >>>>>>>
> >>>>>>> - This operator is for the JIRA ticket raised here
> >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260> <
> >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260>>
> >>>>>>> - The operator embeds a python interpreter in the operator JVM
> >>> process
> >>>>>>> space and is not external to the JVM.
> >>>>>>> - The implementation is proposing the use of Java Embedded Python (
> >>>> JEP
> >>>>> )
> >>>>>>> given here https://github.com/ninia/jep <
> https://github.com/ninia/jep> <
> >>> https://github.com/ninia/jep <https://github.com/ninia/jep>
> >>>>>
> >>>>>>> - The JEP engine is under zlib/libpng license. Since this is an
> >>>> approved
> >>>>>>> license under https://www.apache.org/legal/
> resolved.html#category-a
> >>> <
> >>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am
> >>> assuming
> >>>> it
> >>>>>>> is ok for the community to approve the inclusion of this library
> >>>>>>> - Python integration is a messy piece due to the nature of dynamic
> >>>>>>> libraries. All python libraries need to be natively installed. This
> >>>> also
> >>>>>>> means we will not be able bundle python libraries and dependencies
> >>> as
> >>>>> part
> >>>>>>> of the build into the target JVM container. Hence this operator has
> >>>> the
> >>>>>>> current limitation of the python binaries installed through an
> >>>> external
> >>>>>>> process on all of the YARN nodes for now.
> >>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around
> >>> the
> >>>>>>> dynamic library that is installed externally to the Apex
> >>> installation
> >>>>>>> process on all of the YARN nodes.
> >>>>>>> - Hope to take up https://issues.apache.org/
> >>> jira/browse/APEXCORE-796
> >>>> <
> >>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
> >>>> issue
> >>>>>>> in the future.
> >>>>>>> - The python operator implementation can be extended to py4J based
> >>>>>>> implementation ( as opposed to in-memory model like JEP ) in the
> >>>> future
> >>>>> if
> >>>>>>> required be. JEP is the implementation based on an in-memory design
> >>>>> pattern.
> >>>>>>> - The python operator allows for 4 major API patterns
> >>>>>>>   - Execute a method call by accepting parameters to pass to the
> >>>>>>> interpreter
> >>>>>>>   - Execute a python script as given in a file path
> >>>>>>>   - Evaluate an expression and allows for passing of variables
> >>>> between
> >>>>>>> the java code and the python in-memory interpreter bridge
> >>>>>>>   - A handy method wherein a series of instructions can be passed
> >>> in
> >>>>> one
> >>>>>>> single java call ( executed as a sequence of python eval
> >>> instructions
> >>>>> under
> >>>>>>> the hood )
> >>>>>>> - Automatic garbage collection of the variables that are passed
> from
> >>>>> java
> >>>>>>> code to the in memory python interpreter
> >>>>>>> - Support for all major python libraries. Tensorflow, Keras,
> Scikit,
> >>>>>>> xgboost. Preliminary tests for these libraries seem to work as per
> >>>> code
> >>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> >>>>>>> apex/apexjvmpython <https://github.com/ananthc/
> >>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython>
> >>>>>>> - The implementation allows for SLA based execution model. i.e. the
> >>>>>>> operator is given a chance to execute the python code and if not
> >>>>> complete
> >>>>>>> within a time out, the operator code returns back null.
> >>>>>>> - A tuple that has become a straggler as per previous point will
> >>>>>>> automatically be drained off to a different port so that downstream
> >>>>>>> operators can still consume the straggler if they want to when the
> >>>>> results
> >>>>>>> arrive.
> >>>>>>> - Because of the nature of python being an interpreter and if a
> >>>> previous
> >>>>>>> tuple is being still processed, there is chance of a back pressure
> >>>>> pattern
> >>>>>>> building up very quickly. Hence this operator works on the concept
> >>> of
> >>>> a
> >>>>>>> worker pool. The Python operator uses a configurable number of
> >>> worker
> >>>>>>> thread each of which embed the Python interpreter within their
> >>>>> processing
> >>>>>>> space. i.e. it is in fact a collection of python ink memory
> >>>> interpreters
> >>>>>>> inside the Python operator implementation.
> >>>>>>> - The operator chooses one of the threads at runtime basing on
> their
> >>>>> busy
> >>>>>>> state thus allowing for back-pressure issues to be resolved
> >>>>> automatically.
> >>>>>>> - There is a first class support for Numpy in JEP. Java arrays
> would
> >>>> be
> >>>>>>> convertible to the Python Numpy arrays and vice versa and share the
> >>>> same
> >>>>>>> memory addresses for efficiency reasons.
> >>>>>>> - The base operator implements dynamic partitioning based on a
> >>> thread
> >>>>>>> starvation policy. At each checkpoint, it checks how much
> percentage
> >>>> of
> >>>>> the
> >>>>>>> requests resulted in starved threads and if the starvation exceeds
> a
> >>>>>>> configured percentage, a new instance of the operator is
> provisioned
> >>>> for
> >>>>>>> every such instance of the operator
> >>>>>>> - The operator provides the notion of a worker execution mode.
> There
> >>>> are
> >>>>>>> two worker modes that are passed in each of the above calls from
> the
> >>>>> user.
> >>>>>>> ALL or ANY.  Because python interpreter is state based engine, a
> >>> newly
> >>>>>>> dynamically partitioned operator might not be in the exact state of
> >>>> the
> >>>>>>> remaining operators. Hence the operator has this notion of worker
> >>>>> execution
> >>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with
> >>> ALL
> >>>>>>> execution mode will be executed on all the workers of the worker
> >>>> thread
> >>>>>>> pool as well as the dynamically portioned instance whenever such an
> >>>>>>> instance is provisioned.
> >>>>>>> - The base operator implementation has a method that can be
> >>> overridden
> >>>>> to
> >>>>>>> implement the logic that needs to be executed for each tuple. The
> >>> base
> >>>>>>> operator default implementation is a simple NO-OP.
> >>>>>>> - The operator automatically picks up the least busy of the thread
> >>>> pool
> >>>>>>> worker which has JEP embedded in it to execute the call.
> >>>>>>> - The JEP based installation will not support non Cpython modules.
> >>> All
> >>>>> of
> >>>>>>> the major python libraries are cpython based and hence I believe
> >>> this
> >>>>> is of
> >>>>>>> a lesser concern. If we hit a roadblock when a new python library
> >>>> being
> >>>>> a
> >>>>>>> non-Cpython based library needs to be run, then we could implement
> >>> the
> >>>>>>> ApexPythonEngine interface to something like Py4J which involves
> >>>>>>> interprocess communication.
> >>>>>>> - The python operator requires the user to set the library path
> >>>>>>> java.library.path for the operator to make use of the dynamic
> >>>> libraries
> >>>>> of
> >>>>>>> the corresponding platform. This has to be passed in as the JVM
> >>>> options.
> >>>>>>> Failing to do so will result in the operator failing to load the
> >>>>>>> interpreter properly.
> >>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
> >>>> Numpy
> >>>>>> =
> >>>>>>> 1.7 is supported.
> >>>>>>> - There is no support for virtual environments yet. In case of
> >>>> multiple
> >>>>>>> python versions on the node, to include the right python version
> for
> >>>> the
> >>>>>>> apex operator, ensure that the environment variables and the
> dynamic
> >>>>>>> library path are set appropriately. This is a workaround and I hope
> >>>>>>> APEXCORE-796 will solve this issue as well.
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Ananth
> >
>
>

Reply via email to