Thanks for the comments Thomas and Pramod. 

Apologies for the delayed response on this thread. 

> I believe the thread implementation still adds some value over a container 
> local approach. It is more of a “thread local” equivalent which is more 
> efficient as opposed to a container local implementation. Also the number of 
> worker threads is configurable. Setting the value of 1 will let the user to 
> not do this ( although I do not see a reason for why not ). There is always 
> the over head of serialise/de-serialize cycle even for a container local 
> approach and there is the additional possibility of container local not being 
> honoured by the Resource manager based on the state of the resources. 

> Regarding the configurable key to ensure all tuples in a window are 
> processed, I am adding a switch which can let the user choose ( and javadoc 
> that clearly points out issues if not waiting for the tuples to be completely 
> processed ). There are pros and cons for this and letting the user decide 
> might be a better approach. The reason why I mention cons for waiting the 
> tuples to complete ( apart from the reason that Thomas mentioned ) is that if 
> one of the commands that the user wrote is an erroneous one, all the 
> subsequent calls to that interpreter thread cal fail. An example use case is 
> that tuple A set some value for variable x and tuple B that is coming next is 
> making use of the variable x. Syntactically expression for tuple B is valid 
> but just that it depends on variable x. Now if the variable x is not in 
> memory because tuple A is a straggler resulting in tuple B resulting in an 
> erroneous interpreter state. Hence the operator might stall definitely as end 
> window will be stalled forever resulting in killing of the operator 
> ultimately. This is also because the erroneous command corrupted the state of 
> the interpreter itself. Of course this can happen to all of the threads in 
> the interpreter worker pool resulting in this state as well. Perhaps an 
> improvement of the current implementation is to detect all such stalled 
> interpreters for more than x windows and rebuild the interpreter thread when 
> such a situation is detected. 

> Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the 
> stragglers are drained out irrespective of a new tuple coming in for 
> processing. In the previous iteration, the stragglers could only be drained 
> when there is a new tuple that came in processing as delayed responses queue 
> could only be checked when there is some activity on the main thread. 

> Thanks for raising the point about the virtual environments: This is a point 
> I missed mentioning in the design description below. There is no support for 
> virtual environments yet in JEP and hence the current limitation. However the 
> work around is simple. As part of the application configuration, we need to 
> provide the JAVA_LIBRARY_PATH which contains the path to the JEP dynamic 
> libraries. If there are multiple python installs ( and hence multiple JEP 
> libraries to choose from for each of the apex applications that are being 
> deployed), setting the right path for the operator JVM will result in picking 
> the corresponding python interpreter version. This also essentially means 
> that we cannot have a thread local deployment configuration of two python 
> operators that belong to different python versions in the same JVM.  The 
> Docker approach ticket should be the right fix for virtual environments 
> issue? https://issues.apache.org/jira/browse/APEXCORE-796 
> <https://issues.apache.org/jira/browse/APEXCORE-796> ( but still might not 
> solve the thread local configuration deployment )

Regards,
Ananth


> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pra...@datatorrent.com> wrote:
> 
> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <t...@apache.org 
> <mailto:t...@apache.org>> wrote:
> 
>> It is exciting to see this move forward, the ability to use Python opens
>> many new possibilities.
>> 
>> Regarding use of worker threads, this is a pattern that we are using
>> elsewhere (for example in the Kafka input operator). When the operator
>> performs blocking operations and consumes little memory and/or CPU, then it
>> is more economic to first use threads to increase parallelism and
>> throughput (up to a limit), and then the more expensive containers for
>> horizontal scaling (multiple threads to make good use of container
>> resources and then scale using the usual partitioning).
>> 
> 
> I think there is a difference. In case of kafka or other input operators
> the threads are less constrained. They can operate with independence and
> can dictate the pace limited only by back pressure. In this case the
> operator is most likely going to be downsteram in the DAG and would have
> constraints for processing guarantees. For scalability, container local
> could also be used as a substitue for multiple threads without resorting to
> using separate containers. I can understand use of a separate thread to be
> able to get around problems like stalled processing but would first try to
> see if something like container local would work for scaling.
> 
> 
>> It is also correct that generally there is no ordering guarantee within a
>> streaming window, and that would be the case when multiple input ports are
>> present as well. (The platform cannot guarantee such ordering, this would
>> need to be done by the operator).
> 
> 
> 
>> Idempotency can be expensive (latency and/or space complexity), and not all
>> applications need it (like certain use cases that process record by record
>> and don't accumulate state). An example might be Python logic that is used
>> for scoring against a model that was built offline. Idempotency would
>> actually be rather difficult to implement, since the operator would need to
>> remember which tuples were emitted in a given interval and on replay block
>> until they are available (and also hold others that may be processed sooner
>> than in the original order). It may be easier to record emitted tuples to a
>> WAL instead of reprocessing.
>> 
> 
> Ordering cannot be guaranteed but the operator would need to finish the
> work it is given a window within the window boundary, otherwise there is a
> chance for data loss in recovery scenarios. You could make checkpoint the
> boundary by which all pending work is completed instead of every window
> boundary but then downstream operators cannot rely on window level
> idempotency for exactly once. Something like file output operator would
> work but not our db kind of operator. Both options could be supported in
> the operator.
> 
> 
>> Regarding not emitting stragglers until the next input arrives, can this
>> not be accomplished using IdleTimeHandler?
>> 
>> What is preventing the use of virtual environments?
>> 
>> Thanks,
>> Thomas
>> 
>> 
>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pra...@datatorrent.com>
>> wrote:
>> 
>>> Hi Ananth,
>>> 
>>> From your explanation, it looks like the threads overall allow you to
>>> achieve two things. Have some sort of overall timeout if by which a tuple
>>> doesn't finish processing then it is flagged as such. Second, it doesn't
>>> block processing of subsequent tuples and you can still process them
>>> meeting the SLA. By checkpoint, however, I think you should try to have a
>>> resolution one way or the other for all the tuples received within the
>>> checkpoint period or every window boundary (see idempotency below),
>>> otherwise, there is a chance of data loss in case of operator restarts.
>> If
>>> a loss is acceptable for stragglers you could let straggler processing
>>> continue beyond checkpoint boundary and let them finish when they can.
>> You
>>> could support both behaviors by use of a property. Furthermore, you may
>> not
>>> want all threads stuck with stragglers and then you are back to square
>> one
>>> so you may need to stop processing stragglers beyond a certain thread
>> usage
>>> threshold. Is there a way to interrupt the processing of the engine?
>>> 
>>> Then there is the question of idempotency. I suspect it would be
>> difficult
>>> to maintain it unless you wait for processing to finish for all tuples
>>> received during the window every window boundary. You may provide an
>> option
>>> for relaxing the strict guarantees for the stragglers like mentioned
>> above.
>>> 
>>> Pramod
>>> 
>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.a...@gmail.com>
>> wrote:
>>> 
>>>> Hello Pramod,
>>>> 
>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is
>> what I
>>>> was thinking for the worker pool implementation.
>>>> 
>>>> - The main reason ( which I forgot to mention in the design points
>> below
>>> )
>>>> is that the java embedded engine allows only the thread that created
>> the
>>>> instance to execute the python logic. This is more because of the JNI
>>>> specification itself. Some hints here https://stackoverflow.com/
>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
>>>> https://stackoverflow.com/questions/18056347/jni-
>>> calling-java-from-c-with-
>>>> multiple-threads> and here http://journals.ecs.soton.ac.
>>>> uk/java/tutorial/native1.1/implementing/sync.html <
>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
>>>> implementing/sync.html>
>>>> 
>>>> - This essentially means that the main operator thread will have to
>> call
>>>> the python code execution logic if the design were otherwise.
>>>> 
>>>> - Since the end user can choose to can write any kind of logic
>> including
>>>> blocking I/O as part of the implementation, I did not want to stall the
>>>> operator thread for any usage pattern.
>>>> 
>>>> - In fact there is only one overall interpreter in the JVM process
>> space
>>>> and the interpreter thread is just a JNI wrapper around it to account
>> for
>>>> the JNI limitations above.
>>>> 
>>>> - It is for the very same reason, there is an API in the implementation
>>> to
>>>> support for registering Shared Modules across all of the interpreter
>>>> threads. Use cases for this exist when there is a global variable
>>> provided
>>>> by the underlying Python library and loading it multiple times can
>> cause
>>>> issues. Hence the API to register a shared module which can be used by
>>> all
>>>> of the Interpreter Threads.
>>>> 
>>>> - The operator submits to a work request queue and consumes from a
>>>> response queue for each of the interpreter thread. There exists one
>>> request
>>>> and one response queue per interpreter thread.
>>>> 
>>>> - The stragglers will get drained from the response queue for a
>>> previously
>>>> submitted request queue.
>>>> 
>>>> - The other reason why I chose to implement it this ways is also for
>> some
>>>> of the use case that I foresee in the ML scoring scenarios. In fraud
>>>> systems, if I have a strict SLA to score a model, the main thread in
>> the
>>>> operator is not helping me implement this pattern at all. The caller to
>>> the
>>>> Apex application will need to proceed if the scoring gets delayed due
>> to
>>>> whatever reason. However the scoring can continue on the interpreter
>>> thread
>>>> and can be drained later ( It is just that the caller did not make use
>> of
>>>> this result but can still be persisted for operators consuming from the
>>>> straggler port.
>>>> 
>>>> - There are 3 output ports for this operator. DefaultOutputPort,
>>>> stragglersPort and an errorPort.
>>>> 
>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow
>>>> models can execute a tensorflow DAG as part of a model scoring
>>>> implementation and hence I wanted to take the approach of a worker
>> pool.
>>>> Yes your point is valid if we wait for the stragglers to complete in a
>>>> given window. The current implementation does not force to wait for all
>>> of
>>>> the stragglers to complete. The stragglers are emitted only when there
>>> is a
>>>> new tuple that is being processed. i.e. when a new tuple arrives for
>>>> scoring , the straggler response queue is checked if there are any
>>> entries
>>>> and if yes, the responses are emitted into the stragglerPort. This
>>>> essentially means that there are situations when the straggler port is
>>>> emitting the result for a request submitted in the previous window.
>> This
>>>> also implies that idempotency cannot be guaranteed across runs of the
>>> same
>>>> input data. In fact all threaded implementations have this issue as
>>>> ordering of the results is not guaranteed to be unique even within a
>>> given
>>>> window ?
>>>> 
>>>> I can enforce a block/drain at the end of the window to force a
>>> completion
>>>> basing on the feedback.
>>>> 
>>>> 
>>>> Regards,
>>>> Ananth
>>>> 
>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com>
>>>> wrote:
>>>>> 
>>>>> Hi Anath,
>>>>> 
>>>>> Sounds interesting and looks like you have put quite a bit of work on
>>> it.
>>>>> Might I suggest changing the title of 2260 to better fit your
>> proposal
>>>> and
>>>>> implementation, mainly so that there is differentiation from 2261.
>>>>> 
>>>>> I wanted to discuss the proposal to use multiple threads in an
>> operator
>>>>> instance. Unless the execution threads are blocking for some sort of
>>> i/o
>>>>> why would it result in a noticeable performance difference compared
>> to
>>>>> processing in operator thread and running multiple partitions of the
>>>>> operator in container local. By running the processing in a separate
>>>> thread
>>>>> from the operator lifecycle thread you don't still get away from
>>> matching
>>>>> the incoming data throughput. The checkpoint will act as a time where
>>> you
>>>>> backpressure will start to materialize when the operator would have
>> to
>>>> wait
>>>>> for your background processing to complete to guarantee all data till
>>> the
>>>>> checkpoint is processed.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> 
>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> Hello All,
>>>>>> 
>>>>>> I would like to submit the design for the Python execution operator
>>>> before
>>>>>> I raise the pull request so that I can refine the implementation
>> based
>>>> on
>>>>>> feedback. Could you please provide feedback on the design if any
>> and I
>>>> will
>>>>>> raise the PR accordingly.
>>>>>> 
>>>>>> - This operator is for the JIRA ticket raised here
>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
>>>>>> - The operator embeds a python interpreter in the operator JVM
>> process
>>>>>> space and is not external to the JVM.
>>>>>> - The implementation is proposing the use of Java Embedded Python (
>>> JEP
>>>> )
>>>>>> given here https://github.com/ninia/jep <
>> https://github.com/ninia/jep
>>>> 
>>>>>> - The JEP engine is under zlib/libpng license. Since this is an
>>> approved
>>>>>> license under https://www.apache.org/legal/resolved.html#category-a
>> <
>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am
>> assuming
>>> it
>>>>>> is ok for the community to approve the inclusion of this library
>>>>>> - Python integration is a messy piece due to the nature of dynamic
>>>>>> libraries. All python libraries need to be natively installed. This
>>> also
>>>>>> means we will not be able bundle python libraries and dependencies
>> as
>>>> part
>>>>>> of the build into the target JVM container. Hence this operator has
>>> the
>>>>>> current limitation of the python binaries installed through an
>>> external
>>>>>> process on all of the YARN nodes for now.
>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around
>> the
>>>>>> dynamic library that is installed externally to the Apex
>> installation
>>>>>> process on all of the YARN nodes.
>>>>>> - Hope to take up https://issues.apache.org/
>> jira/browse/APEXCORE-796
>>> <
>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this
>>> issue
>>>>>> in the future.
>>>>>> - The python operator implementation can be extended to py4J based
>>>>>> implementation ( as opposed to in-memory model like JEP ) in the
>>> future
>>>> if
>>>>>> required be. JEP is the implementation based on an in-memory design
>>>> pattern.
>>>>>> - The python operator allows for 4 major API patterns
>>>>>>   - Execute a method call by accepting parameters to pass to the
>>>>>> interpreter
>>>>>>   - Execute a python script as given in a file path
>>>>>>   - Evaluate an expression and allows for passing of variables
>>> between
>>>>>> the java code and the python in-memory interpreter bridge
>>>>>>   - A handy method wherein a series of instructions can be passed
>> in
>>>> one
>>>>>> single java call ( executed as a sequence of python eval
>> instructions
>>>> under
>>>>>> the hood )
>>>>>> - Automatic garbage collection of the variables that are passed from
>>>> java
>>>>>> code to the in memory python interpreter
>>>>>> - Support for all major python libraries. Tensorflow, Keras, Scikit,
>>>>>> xgboost. Preliminary tests for these libraries seem to work as per
>>> code
>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache-
>>>>>> apex/apexjvmpython <https://github.com/ananthc/
>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython>
>>>>>> - The implementation allows for SLA based execution model. i.e. the
>>>>>> operator is given a chance to execute the python code and if not
>>>> complete
>>>>>> within a time out, the operator code returns back null.
>>>>>> - A tuple that has become a straggler as per previous point will
>>>>>> automatically be drained off to a different port so that downstream
>>>>>> operators can still consume the straggler if they want to when the
>>>> results
>>>>>> arrive.
>>>>>> - Because of the nature of python being an interpreter and if a
>>> previous
>>>>>> tuple is being still processed, there is chance of a back pressure
>>>> pattern
>>>>>> building up very quickly. Hence this operator works on the concept
>> of
>>> a
>>>>>> worker pool. The Python operator uses a configurable number of
>> worker
>>>>>> thread each of which embed the Python interpreter within their
>>>> processing
>>>>>> space. i.e. it is in fact a collection of python ink memory
>>> interpreters
>>>>>> inside the Python operator implementation.
>>>>>> - The operator chooses one of the threads at runtime basing on their
>>>> busy
>>>>>> state thus allowing for back-pressure issues to be resolved
>>>> automatically.
>>>>>> - There is a first class support for Numpy in JEP. Java arrays would
>>> be
>>>>>> convertible to the Python Numpy arrays and vice versa and share the
>>> same
>>>>>> memory addresses for efficiency reasons.
>>>>>> - The base operator implements dynamic partitioning based on a
>> thread
>>>>>> starvation policy. At each checkpoint, it checks how much percentage
>>> of
>>>> the
>>>>>> requests resulted in starved threads and if the starvation exceeds a
>>>>>> configured percentage, a new instance of the operator is provisioned
>>> for
>>>>>> every such instance of the operator
>>>>>> - The operator provides the notion of a worker execution mode. There
>>> are
>>>>>> two worker modes that are passed in each of the above calls from the
>>>> user.
>>>>>> ALL or ANY.  Because python interpreter is state based engine, a
>> newly
>>>>>> dynamically partitioned operator might not be in the exact state of
>>> the
>>>>>> remaining operators. Hence the operator has this notion of worker
>>>> execution
>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with
>> ALL
>>>>>> execution mode will be executed on all the workers of the worker
>>> thread
>>>>>> pool as well as the dynamically portioned instance whenever such an
>>>>>> instance is provisioned.
>>>>>> - The base operator implementation has a method that can be
>> overridden
>>>> to
>>>>>> implement the logic that needs to be executed for each tuple. The
>> base
>>>>>> operator default implementation is a simple NO-OP.
>>>>>> - The operator automatically picks up the least busy of the thread
>>> pool
>>>>>> worker which has JEP embedded in it to execute the call.
>>>>>> - The JEP based installation will not support non Cpython modules.
>> All
>>>> of
>>>>>> the major python libraries are cpython based and hence I believe
>> this
>>>> is of
>>>>>> a lesser concern. If we hit a roadblock when a new python library
>>> being
>>>> a
>>>>>> non-Cpython based library needs to be run, then we could implement
>> the
>>>>>> ApexPythonEngine interface to something like Py4J which involves
>>>>>> interprocess communication.
>>>>>> - The python operator requires the user to set the library path
>>>>>> java.library.path for the operator to make use of the dynamic
>>> libraries
>>>> of
>>>>>> the corresponding platform. This has to be passed in as the JVM
>>> options.
>>>>>> Failing to do so will result in the operator failing to load the
>>>>>> interpreter properly.
>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6.
>>> Numpy
>>>>> =
>>>>>> 1.7 is supported.
>>>>>> - There is no support for virtual environments yet. In case of
>>> multiple
>>>>>> python versions on the node, to include the right python version for
>>> the
>>>>>> apex operator, ensure that the environment variables and the dynamic
>>>>>> library path are set appropriately. This is a workaround and I hope
>>>>>> APEXCORE-796 will solve this issue as well.
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Ananth

Reply via email to