Thanks for the comments Thomas and Pramod. Apologies for the delayed response on this thread.
> I believe the thread implementation still adds some value over a container > local approach. It is more of a “thread local” equivalent which is more > efficient as opposed to a container local implementation. Also the number of > worker threads is configurable. Setting the value of 1 will let the user to > not do this ( although I do not see a reason for why not ). There is always > the over head of serialise/de-serialize cycle even for a container local > approach and there is the additional possibility of container local not being > honoured by the Resource manager based on the state of the resources. > Regarding the configurable key to ensure all tuples in a window are > processed, I am adding a switch which can let the user choose ( and javadoc > that clearly points out issues if not waiting for the tuples to be completely > processed ). There are pros and cons for this and letting the user decide > might be a better approach. The reason why I mention cons for waiting the > tuples to complete ( apart from the reason that Thomas mentioned ) is that if > one of the commands that the user wrote is an erroneous one, all the > subsequent calls to that interpreter thread cal fail. An example use case is > that tuple A set some value for variable x and tuple B that is coming next is > making use of the variable x. Syntactically expression for tuple B is valid > but just that it depends on variable x. Now if the variable x is not in > memory because tuple A is a straggler resulting in tuple B resulting in an > erroneous interpreter state. Hence the operator might stall definitely as end > window will be stalled forever resulting in killing of the operator > ultimately. This is also because the erroneous command corrupted the state of > the interpreter itself. Of course this can happen to all of the threads in > the interpreter worker pool resulting in this state as well. Perhaps an > improvement of the current implementation is to detect all such stalled > interpreters for more than x windows and rebuild the interpreter thread when > such a situation is detected. > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the > stragglers are drained out irrespective of a new tuple coming in for > processing. In the previous iteration, the stragglers could only be drained > when there is a new tuple that came in processing as delayed responses queue > could only be checked when there is some activity on the main thread. > Thanks for raising the point about the virtual environments: This is a point > I missed mentioning in the design description below. There is no support for > virtual environments yet in JEP and hence the current limitation. However the > work around is simple. As part of the application configuration, we need to > provide the JAVA_LIBRARY_PATH which contains the path to the JEP dynamic > libraries. If there are multiple python installs ( and hence multiple JEP > libraries to choose from for each of the apex applications that are being > deployed), setting the right path for the operator JVM will result in picking > the corresponding python interpreter version. This also essentially means > that we cannot have a thread local deployment configuration of two python > operators that belong to different python versions in the same JVM. The > Docker approach ticket should be the right fix for virtual environments > issue? https://issues.apache.org/jira/browse/APEXCORE-796 > <https://issues.apache.org/jira/browse/APEXCORE-796> ( but still might not > solve the thread local configuration deployment ) Regards, Ananth > On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pra...@datatorrent.com> wrote: > > On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <t...@apache.org > <mailto:t...@apache.org>> wrote: > >> It is exciting to see this move forward, the ability to use Python opens >> many new possibilities. >> >> Regarding use of worker threads, this is a pattern that we are using >> elsewhere (for example in the Kafka input operator). When the operator >> performs blocking operations and consumes little memory and/or CPU, then it >> is more economic to first use threads to increase parallelism and >> throughput (up to a limit), and then the more expensive containers for >> horizontal scaling (multiple threads to make good use of container >> resources and then scale using the usual partitioning). >> > > I think there is a difference. In case of kafka or other input operators > the threads are less constrained. They can operate with independence and > can dictate the pace limited only by back pressure. In this case the > operator is most likely going to be downsteram in the DAG and would have > constraints for processing guarantees. For scalability, container local > could also be used as a substitue for multiple threads without resorting to > using separate containers. I can understand use of a separate thread to be > able to get around problems like stalled processing but would first try to > see if something like container local would work for scaling. > > >> It is also correct that generally there is no ordering guarantee within a >> streaming window, and that would be the case when multiple input ports are >> present as well. (The platform cannot guarantee such ordering, this would >> need to be done by the operator). > > > >> Idempotency can be expensive (latency and/or space complexity), and not all >> applications need it (like certain use cases that process record by record >> and don't accumulate state). An example might be Python logic that is used >> for scoring against a model that was built offline. Idempotency would >> actually be rather difficult to implement, since the operator would need to >> remember which tuples were emitted in a given interval and on replay block >> until they are available (and also hold others that may be processed sooner >> than in the original order). It may be easier to record emitted tuples to a >> WAL instead of reprocessing. >> > > Ordering cannot be guaranteed but the operator would need to finish the > work it is given a window within the window boundary, otherwise there is a > chance for data loss in recovery scenarios. You could make checkpoint the > boundary by which all pending work is completed instead of every window > boundary but then downstream operators cannot rely on window level > idempotency for exactly once. Something like file output operator would > work but not our db kind of operator. Both options could be supported in > the operator. > > >> Regarding not emitting stragglers until the next input arrives, can this >> not be accomplished using IdleTimeHandler? >> >> What is preventing the use of virtual environments? >> >> Thanks, >> Thomas >> >> >> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni <pra...@datatorrent.com> >> wrote: >> >>> Hi Ananth, >>> >>> From your explanation, it looks like the threads overall allow you to >>> achieve two things. Have some sort of overall timeout if by which a tuple >>> doesn't finish processing then it is flagged as such. Second, it doesn't >>> block processing of subsequent tuples and you can still process them >>> meeting the SLA. By checkpoint, however, I think you should try to have a >>> resolution one way or the other for all the tuples received within the >>> checkpoint period or every window boundary (see idempotency below), >>> otherwise, there is a chance of data loss in case of operator restarts. >> If >>> a loss is acceptable for stragglers you could let straggler processing >>> continue beyond checkpoint boundary and let them finish when they can. >> You >>> could support both behaviors by use of a property. Furthermore, you may >> not >>> want all threads stuck with stragglers and then you are back to square >> one >>> so you may need to stop processing stragglers beyond a certain thread >> usage >>> threshold. Is there a way to interrupt the processing of the engine? >>> >>> Then there is the question of idempotency. I suspect it would be >> difficult >>> to maintain it unless you wait for processing to finish for all tuples >>> received during the window every window boundary. You may provide an >> option >>> for relaxing the strict guarantees for the stragglers like mentioned >> above. >>> >>> Pramod >>> >>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.a...@gmail.com> >> wrote: >>> >>>> Hello Pramod, >>>> >>>> Thanks for the comments. I adjusted the title of the JIRA. Here is >> what I >>>> was thinking for the worker pool implementation. >>>> >>>> - The main reason ( which I forgot to mention in the design points >> below >>> ) >>>> is that the java embedded engine allows only the thread that created >> the >>>> instance to execute the python logic. This is more because of the JNI >>>> specification itself. Some hints here https://stackoverflow.com/ >>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads < >>>> https://stackoverflow.com/questions/18056347/jni- >>> calling-java-from-c-with- >>>> multiple-threads> and here http://journals.ecs.soton.ac. >>>> uk/java/tutorial/native1.1/implementing/sync.html < >>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/ >>>> implementing/sync.html> >>>> >>>> - This essentially means that the main operator thread will have to >> call >>>> the python code execution logic if the design were otherwise. >>>> >>>> - Since the end user can choose to can write any kind of logic >> including >>>> blocking I/O as part of the implementation, I did not want to stall the >>>> operator thread for any usage pattern. >>>> >>>> - In fact there is only one overall interpreter in the JVM process >> space >>>> and the interpreter thread is just a JNI wrapper around it to account >> for >>>> the JNI limitations above. >>>> >>>> - It is for the very same reason, there is an API in the implementation >>> to >>>> support for registering Shared Modules across all of the interpreter >>>> threads. Use cases for this exist when there is a global variable >>> provided >>>> by the underlying Python library and loading it multiple times can >> cause >>>> issues. Hence the API to register a shared module which can be used by >>> all >>>> of the Interpreter Threads. >>>> >>>> - The operator submits to a work request queue and consumes from a >>>> response queue for each of the interpreter thread. There exists one >>> request >>>> and one response queue per interpreter thread. >>>> >>>> - The stragglers will get drained from the response queue for a >>> previously >>>> submitted request queue. >>>> >>>> - The other reason why I chose to implement it this ways is also for >> some >>>> of the use case that I foresee in the ML scoring scenarios. In fraud >>>> systems, if I have a strict SLA to score a model, the main thread in >> the >>>> operator is not helping me implement this pattern at all. The caller to >>> the >>>> Apex application will need to proceed if the scoring gets delayed due >> to >>>> whatever reason. However the scoring can continue on the interpreter >>> thread >>>> and can be drained later ( It is just that the caller did not make use >> of >>>> this result but can still be persisted for operators consuming from the >>>> straggler port. >>>> >>>> - There are 3 output ports for this operator. DefaultOutputPort, >>>> stragglersPort and an errorPort. >>>> >>>> - Some libraries like Tensorflow can become really heavy. Tensorflow >>>> models can execute a tensorflow DAG as part of a model scoring >>>> implementation and hence I wanted to take the approach of a worker >> pool. >>>> Yes your point is valid if we wait for the stragglers to complete in a >>>> given window. The current implementation does not force to wait for all >>> of >>>> the stragglers to complete. The stragglers are emitted only when there >>> is a >>>> new tuple that is being processed. i.e. when a new tuple arrives for >>>> scoring , the straggler response queue is checked if there are any >>> entries >>>> and if yes, the responses are emitted into the stragglerPort. This >>>> essentially means that there are situations when the straggler port is >>>> emitting the result for a request submitted in the previous window. >> This >>>> also implies that idempotency cannot be guaranteed across runs of the >>> same >>>> input data. In fact all threaded implementations have this issue as >>>> ordering of the results is not guaranteed to be unique even within a >>> given >>>> window ? >>>> >>>> I can enforce a block/drain at the end of the window to force a >>> completion >>>> basing on the feedback. >>>> >>>> >>>> Regards, >>>> Ananth >>>> >>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com> >>>> wrote: >>>>> >>>>> Hi Anath, >>>>> >>>>> Sounds interesting and looks like you have put quite a bit of work on >>> it. >>>>> Might I suggest changing the title of 2260 to better fit your >> proposal >>>> and >>>>> implementation, mainly so that there is differentiation from 2261. >>>>> >>>>> I wanted to discuss the proposal to use multiple threads in an >> operator >>>>> instance. Unless the execution threads are blocking for some sort of >>> i/o >>>>> why would it result in a noticeable performance difference compared >> to >>>>> processing in operator thread and running multiple partitions of the >>>>> operator in container local. By running the processing in a separate >>>> thread >>>>> from the operator lifecycle thread you don't still get away from >>> matching >>>>> the incoming data throughput. The checkpoint will act as a time where >>> you >>>>> backpressure will start to materialize when the operator would have >> to >>>> wait >>>>> for your background processing to complete to guarantee all data till >>> the >>>>> checkpoint is processed. >>>>> >>>>> Thanks >>>>> >>>>> >>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com> >>>> wrote: >>>>> >>>>>> Hello All, >>>>>> >>>>>> I would like to submit the design for the Python execution operator >>>> before >>>>>> I raise the pull request so that I can refine the implementation >> based >>>> on >>>>>> feedback. Could you please provide feedback on the design if any >> and I >>>> will >>>>>> raise the PR accordingly. >>>>>> >>>>>> - This operator is for the JIRA ticket raised here >>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 < >>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260> >>>>>> - The operator embeds a python interpreter in the operator JVM >> process >>>>>> space and is not external to the JVM. >>>>>> - The implementation is proposing the use of Java Embedded Python ( >>> JEP >>>> ) >>>>>> given here https://github.com/ninia/jep < >> https://github.com/ninia/jep >>>> >>>>>> - The JEP engine is under zlib/libpng license. Since this is an >>> approved >>>>>> license under https://www.apache.org/legal/resolved.html#category-a >> < >>>>>> https://www.apache.org/legal/resolved.html#category-a> I am >> assuming >>> it >>>>>> is ok for the community to approve the inclusion of this library >>>>>> - Python integration is a messy piece due to the nature of dynamic >>>>>> libraries. All python libraries need to be natively installed. This >>> also >>>>>> means we will not be able bundle python libraries and dependencies >> as >>>> part >>>>>> of the build into the target JVM container. Hence this operator has >>> the >>>>>> current limitation of the python binaries installed through an >>> external >>>>>> process on all of the YARN nodes for now. >>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around >> the >>>>>> dynamic library that is installed externally to the Apex >> installation >>>>>> process on all of the YARN nodes. >>>>>> - Hope to take up https://issues.apache.org/ >> jira/browse/APEXCORE-796 >>> < >>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this >>> issue >>>>>> in the future. >>>>>> - The python operator implementation can be extended to py4J based >>>>>> implementation ( as opposed to in-memory model like JEP ) in the >>> future >>>> if >>>>>> required be. JEP is the implementation based on an in-memory design >>>> pattern. >>>>>> - The python operator allows for 4 major API patterns >>>>>> - Execute a method call by accepting parameters to pass to the >>>>>> interpreter >>>>>> - Execute a python script as given in a file path >>>>>> - Evaluate an expression and allows for passing of variables >>> between >>>>>> the java code and the python in-memory interpreter bridge >>>>>> - A handy method wherein a series of instructions can be passed >> in >>>> one >>>>>> single java call ( executed as a sequence of python eval >> instructions >>>> under >>>>>> the hood ) >>>>>> - Automatic garbage collection of the variables that are passed from >>>> java >>>>>> code to the in memory python interpreter >>>>>> - Support for all major python libraries. Tensorflow, Keras, Scikit, >>>>>> xgboost. Preliminary tests for these libraries seem to work as per >>> code >>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache- >>>>>> apex/apexjvmpython <https://github.com/ananthc/ >>>>>> sampleapps/tree/master/apache-apex/apexjvmpython> >>>>>> - The implementation allows for SLA based execution model. i.e. the >>>>>> operator is given a chance to execute the python code and if not >>>> complete >>>>>> within a time out, the operator code returns back null. >>>>>> - A tuple that has become a straggler as per previous point will >>>>>> automatically be drained off to a different port so that downstream >>>>>> operators can still consume the straggler if they want to when the >>>> results >>>>>> arrive. >>>>>> - Because of the nature of python being an interpreter and if a >>> previous >>>>>> tuple is being still processed, there is chance of a back pressure >>>> pattern >>>>>> building up very quickly. Hence this operator works on the concept >> of >>> a >>>>>> worker pool. The Python operator uses a configurable number of >> worker >>>>>> thread each of which embed the Python interpreter within their >>>> processing >>>>>> space. i.e. it is in fact a collection of python ink memory >>> interpreters >>>>>> inside the Python operator implementation. >>>>>> - The operator chooses one of the threads at runtime basing on their >>>> busy >>>>>> state thus allowing for back-pressure issues to be resolved >>>> automatically. >>>>>> - There is a first class support for Numpy in JEP. Java arrays would >>> be >>>>>> convertible to the Python Numpy arrays and vice versa and share the >>> same >>>>>> memory addresses for efficiency reasons. >>>>>> - The base operator implements dynamic partitioning based on a >> thread >>>>>> starvation policy. At each checkpoint, it checks how much percentage >>> of >>>> the >>>>>> requests resulted in starved threads and if the starvation exceeds a >>>>>> configured percentage, a new instance of the operator is provisioned >>> for >>>>>> every such instance of the operator >>>>>> - The operator provides the notion of a worker execution mode. There >>> are >>>>>> two worker modes that are passed in each of the above calls from the >>>> user. >>>>>> ALL or ANY. Because python interpreter is state based engine, a >> newly >>>>>> dynamically partitioned operator might not be in the exact state of >>> the >>>>>> remaining operators. Hence the operator has this notion of worker >>>> execution >>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with >> ALL >>>>>> execution mode will be executed on all the workers of the worker >>> thread >>>>>> pool as well as the dynamically portioned instance whenever such an >>>>>> instance is provisioned. >>>>>> - The base operator implementation has a method that can be >> overridden >>>> to >>>>>> implement the logic that needs to be executed for each tuple. The >> base >>>>>> operator default implementation is a simple NO-OP. >>>>>> - The operator automatically picks up the least busy of the thread >>> pool >>>>>> worker which has JEP embedded in it to execute the call. >>>>>> - The JEP based installation will not support non Cpython modules. >> All >>>> of >>>>>> the major python libraries are cpython based and hence I believe >> this >>>> is of >>>>>> a lesser concern. If we hit a roadblock when a new python library >>> being >>>> a >>>>>> non-Cpython based library needs to be run, then we could implement >> the >>>>>> ApexPythonEngine interface to something like Py4J which involves >>>>>> interprocess communication. >>>>>> - The python operator requires the user to set the library path >>>>>> java.library.path for the operator to make use of the dynamic >>> libraries >>>> of >>>>>> the corresponding platform. This has to be passed in as the JVM >>> options. >>>>>> Failing to do so will result in the operator failing to load the >>>>>> interpreter properly. >>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. >>> Numpy >>>>> = >>>>>> 1.7 is supported. >>>>>> - There is no support for virtual environments yet. In case of >>> multiple >>>>>> python versions on the node, to include the right python version for >>> the >>>>>> apex operator, ensure that the environment variables and the dynamic >>>>>> library path are set appropriately. This is a workaround and I hope >>>>>> APEXCORE-796 will solve this issue as well. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Ananth