Serialization and resource manager are not involved in CONTAINER_LOCAL. However, CONTAINER_LOCAL is probably not what you want, since it would force all partitions to run in the same JVM.
Thomas On Fri, Dec 22, 2017 at 1:19 PM, Ananth G <ananthg.a...@gmail.com> wrote: > I guess my comment below regarding overhead of serialisation in container > local is wrong ? Nevertheless having a local thread implementation gives > some benefits . For example I am using to whether sleep if there is no > request in the queue or spin checking for request presence in the request > queue etc to take care of no delays in the request queue processing itself. > > Regards, > Ananth > > > On 23 Dec 2017, at 6:50 am, Ananth G <ananthg.a...@gmail.com> wrote: > > > > > > Thanks for the comments Thomas and Pramod. > > > > Apologies for the delayed response on this thread. > > > > > I believe the thread implementation still adds some value over a > container local approach. It is more of a “thread local” equivalent which > is more efficient as opposed to a container local implementation. Also the > number of worker threads is configurable. Setting the value of 1 will let > the user to not do this ( although I do not see a reason for why not ). > There is always the over head of serialise/de-serialize cycle even for a > container local approach and there is the additional possibility of > container local not being honoured by the Resource manager based on the > state of the resources. > > > > > Regarding the configurable key to ensure all tuples in a window are > processed, I am adding a switch which can let the user choose ( and javadoc > that clearly points out issues if not waiting for the tuples to be > completely processed ). There are pros and cons for this and letting the > user decide might be a better approach. The reason why I mention cons for > waiting the tuples to complete ( apart from the reason that Thomas > mentioned ) is that if one of the commands that the user wrote is an > erroneous one, all the subsequent calls to that interpreter thread cal > fail. An example use case is that tuple A set some value for variable x and > tuple B that is coming next is making use of the variable x. Syntactically > expression for tuple B is valid but just that it depends on variable x. Now > if the variable x is not in memory because tuple A is a straggler resulting > in tuple B resulting in an erroneous interpreter state. Hence the operator > might stall definitely as end window will be stalled forever resulting in > killing of the operator ultimately. This is also because the erroneous > command corrupted the state of the interpreter itself. Of course this can > happen to all of the threads in the interpreter worker pool resulting in > this state as well. Perhaps an improvement of the current implementation is > to detect all such stalled interpreters for more than x windows and rebuild > the interpreter thread when such a situation is detected. > > > > > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that > the stragglers are drained out irrespective of a new tuple coming in for > processing. In the previous iteration, the stragglers could only be drained > when there is a new tuple that came in processing as delayed responses > queue could only be checked when there is some activity on the main thread. > > > > > Thanks for raising the point about the virtual environments: This is a > point I missed mentioning in the design description below. There is no > support for virtual environments yet in JEP and hence the current > limitation. However the work around is simple. As part of the application > configuration, we need to provide the JAVA_LIBRARY_PATH which contains the > path to the JEP dynamic libraries. If there are multiple python installs ( > and hence multiple JEP libraries to choose from for each of the apex > applications that are being deployed), setting the right path for the > operator JVM will result in picking the corresponding python interpreter > version. This also essentially means that we cannot have a thread local > deployment configuration of two python operators that belong to different > python versions in the same JVM. The Docker approach ticket should be the > right fix for virtual environments issue? https://issues.apache.org/ > jira/browse/APEXCORE-796 <https://issues.apache.org/ > jira/browse/APEXCORE-796> ( but still might not solve the thread local > configuration deployment ) > > > > Regards, > > Ananth > > > > > >> On 21 Dec 2017, at 11:01 am, Pramod Immaneni <pra...@datatorrent.com > <mailto:pra...@datatorrent.com>> wrote: > >> > >> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise <t...@apache.org <mailto: > t...@apache.org>> wrote: > >> > >>> It is exciting to see this move forward, the ability to use Python > opens > >>> many new possibilities. > >>> > >>> Regarding use of worker threads, this is a pattern that we are using > >>> elsewhere (for example in the Kafka input operator). When the operator > >>> performs blocking operations and consumes little memory and/or CPU, > then it > >>> is more economic to first use threads to increase parallelism and > >>> throughput (up to a limit), and then the more expensive containers for > >>> horizontal scaling (multiple threads to make good use of container > >>> resources and then scale using the usual partitioning). > >>> > >> > >> I think there is a difference. In case of kafka or other input operators > >> the threads are less constrained. They can operate with independence and > >> can dictate the pace limited only by back pressure. In this case the > >> operator is most likely going to be downsteram in the DAG and would have > >> constraints for processing guarantees. For scalability, container local > >> could also be used as a substitue for multiple threads without > resorting to > >> using separate containers. I can understand use of a separate thread to > be > >> able to get around problems like stalled processing but would first try > to > >> see if something like container local would work for scaling. > >> > >> > >>> It is also correct that generally there is no ordering guarantee > within a > >>> streaming window, and that would be the case when multiple input ports > are > >>> present as well. (The platform cannot guarantee such ordering, this > would > >>> need to be done by the operator). > >> > >> > >> > >>> Idempotency can be expensive (latency and/or space complexity), and > not all > >>> applications need it (like certain use cases that process record by > record > >>> and don't accumulate state). An example might be Python logic that is > used > >>> for scoring against a model that was built offline. Idempotency would > >>> actually be rather difficult to implement, since the operator would > need to > >>> remember which tuples were emitted in a given interval and on replay > block > >>> until they are available (and also hold others that may be processed > sooner > >>> than in the original order). It may be easier to record emitted tuples > to a > >>> WAL instead of reprocessing. > >>> > >> > >> Ordering cannot be guaranteed but the operator would need to finish the > >> work it is given a window within the window boundary, otherwise there > is a > >> chance for data loss in recovery scenarios. You could make checkpoint > the > >> boundary by which all pending work is completed instead of every window > >> boundary but then downstream operators cannot rely on window level > >> idempotency for exactly once. Something like file output operator would > >> work but not our db kind of operator. Both options could be supported in > >> the operator. > >> > >> > >>> Regarding not emitting stragglers until the next input arrives, can > this > >>> not be accomplished using IdleTimeHandler? > >>> > >>> What is preventing the use of virtual environments? > >>> > >>> Thanks, > >>> Thomas > >>> > >>> > >>> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni < > pra...@datatorrent.com <mailto:pra...@datatorrent.com>> > >>> wrote: > >>> > >>>> Hi Ananth, > >>>> > >>>> From your explanation, it looks like the threads overall allow you to > >>>> achieve two things. Have some sort of overall timeout if by which a > tuple > >>>> doesn't finish processing then it is flagged as such. Second, it > doesn't > >>>> block processing of subsequent tuples and you can still process them > >>>> meeting the SLA. By checkpoint, however, I think you should try to > have a > >>>> resolution one way or the other for all the tuples received within the > >>>> checkpoint period or every window boundary (see idempotency below), > >>>> otherwise, there is a chance of data loss in case of operator > restarts. > >>> If > >>>> a loss is acceptable for stragglers you could let straggler processing > >>>> continue beyond checkpoint boundary and let them finish when they can. > >>> You > >>>> could support both behaviors by use of a property. Furthermore, you > may > >>> not > >>>> want all threads stuck with stragglers and then you are back to square > >>> one > >>>> so you may need to stop processing stragglers beyond a certain thread > >>> usage > >>>> threshold. Is there a way to interrupt the processing of the engine? > >>>> > >>>> Then there is the question of idempotency. I suspect it would be > >>> difficult > >>>> to maintain it unless you wait for processing to finish for all tuples > >>>> received during the window every window boundary. You may provide an > >>> option > >>>> for relaxing the strict guarantees for the stragglers like mentioned > >>> above. > >>>> > >>>> Pramod > >>>> > >>>> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.a...@gmail.com > <mailto:ananthg.a...@gmail.com>> > >>> wrote: > >>>> > >>>>> Hello Pramod, > >>>>> > >>>>> Thanks for the comments. I adjusted the title of the JIRA. Here is > >>> what I > >>>>> was thinking for the worker pool implementation. > >>>>> > >>>>> - The main reason ( which I forgot to mention in the design points > >>> below > >>>> ) > >>>>> is that the java embedded engine allows only the thread that created > >>> the > >>>>> instance to execute the python logic. This is more because of the JNI > >>>>> specification itself. Some hints here https://stackoverflow.com/ < > https://stackoverflow.com/> > >>>>> questions/18056347/jni-calling-java-from-c-with-multiple-threads < > >>>>> https://stackoverflow.com/questions/18056347/jni- < > https://stackoverflow.com/questions/18056347/jni-> > >>>> calling-java-from-c-with- > >>>>> multiple-threads> and here http://journals.ecs.soton.ac < > http://journals.ecs.soton.ac/>. > >>>>> uk/java/tutorial/native1.1/implementing/sync.html < > >>>>> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/ < > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/> > >>>>> implementing/sync.html> > >>>>> > >>>>> - This essentially means that the main operator thread will have to > >>> call > >>>>> the python code execution logic if the design were otherwise. > >>>>> > >>>>> - Since the end user can choose to can write any kind of logic > >>> including > >>>>> blocking I/O as part of the implementation, I did not want to stall > the > >>>>> operator thread for any usage pattern. > >>>>> > >>>>> - In fact there is only one overall interpreter in the JVM process > >>> space > >>>>> and the interpreter thread is just a JNI wrapper around it to account > >>> for > >>>>> the JNI limitations above. > >>>>> > >>>>> - It is for the very same reason, there is an API in the > implementation > >>>> to > >>>>> support for registering Shared Modules across all of the interpreter > >>>>> threads. Use cases for this exist when there is a global variable > >>>> provided > >>>>> by the underlying Python library and loading it multiple times can > >>> cause > >>>>> issues. Hence the API to register a shared module which can be used > by > >>>> all > >>>>> of the Interpreter Threads. > >>>>> > >>>>> - The operator submits to a work request queue and consumes from a > >>>>> response queue for each of the interpreter thread. There exists one > >>>> request > >>>>> and one response queue per interpreter thread. > >>>>> > >>>>> - The stragglers will get drained from the response queue for a > >>>> previously > >>>>> submitted request queue. > >>>>> > >>>>> - The other reason why I chose to implement it this ways is also for > >>> some > >>>>> of the use case that I foresee in the ML scoring scenarios. In fraud > >>>>> systems, if I have a strict SLA to score a model, the main thread in > >>> the > >>>>> operator is not helping me implement this pattern at all. The caller > to > >>>> the > >>>>> Apex application will need to proceed if the scoring gets delayed due > >>> to > >>>>> whatever reason. However the scoring can continue on the interpreter > >>>> thread > >>>>> and can be drained later ( It is just that the caller did not make > use > >>> of > >>>>> this result but can still be persisted for operators consuming from > the > >>>>> straggler port. > >>>>> > >>>>> - There are 3 output ports for this operator. DefaultOutputPort, > >>>>> stragglersPort and an errorPort. > >>>>> > >>>>> - Some libraries like Tensorflow can become really heavy. Tensorflow > >>>>> models can execute a tensorflow DAG as part of a model scoring > >>>>> implementation and hence I wanted to take the approach of a worker > >>> pool. > >>>>> Yes your point is valid if we wait for the stragglers to complete in > a > >>>>> given window. The current implementation does not force to wait for > all > >>>> of > >>>>> the stragglers to complete. The stragglers are emitted only when > there > >>>> is a > >>>>> new tuple that is being processed. i.e. when a new tuple arrives for > >>>>> scoring , the straggler response queue is checked if there are any > >>>> entries > >>>>> and if yes, the responses are emitted into the stragglerPort. This > >>>>> essentially means that there are situations when the straggler port > is > >>>>> emitting the result for a request submitted in the previous window. > >>> This > >>>>> also implies that idempotency cannot be guaranteed across runs of the > >>>> same > >>>>> input data. In fact all threaded implementations have this issue as > >>>>> ordering of the results is not guaranteed to be unique even within a > >>>> given > >>>>> window ? > >>>>> > >>>>> I can enforce a block/drain at the end of the window to force a > >>>> completion > >>>>> basing on the feedback. > >>>>> > >>>>> > >>>>> Regards, > >>>>> Ananth > >>>>> > >>>>>> On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pra...@datatorrent.com > <mailto:pra...@datatorrent.com>> > >>>>> wrote: > >>>>>> > >>>>>> Hi Anath, > >>>>>> > >>>>>> Sounds interesting and looks like you have put quite a bit of work > on > >>>> it. > >>>>>> Might I suggest changing the title of 2260 to better fit your > >>> proposal > >>>>> and > >>>>>> implementation, mainly so that there is differentiation from 2261. > >>>>>> > >>>>>> I wanted to discuss the proposal to use multiple threads in an > >>> operator > >>>>>> instance. Unless the execution threads are blocking for some sort of > >>>> i/o > >>>>>> why would it result in a noticeable performance difference compared > >>> to > >>>>>> processing in operator thread and running multiple partitions of the > >>>>>> operator in container local. By running the processing in a separate > >>>>> thread > >>>>>> from the operator lifecycle thread you don't still get away from > >>>> matching > >>>>>> the incoming data throughput. The checkpoint will act as a time > where > >>>> you > >>>>>> backpressure will start to materialize when the operator would have > >>> to > >>>>> wait > >>>>>> for your background processing to complete to guarantee all data > till > >>>> the > >>>>>> checkpoint is processed. > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> > >>>>>> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.a...@gmail.com > <mailto:ananthg.a...@gmail.com>> > >>>>> wrote: > >>>>>> > >>>>>>> Hello All, > >>>>>>> > >>>>>>> I would like to submit the design for the Python execution operator > >>>>> before > >>>>>>> I raise the pull request so that I can refine the implementation > >>> based > >>>>> on > >>>>>>> feedback. Could you please provide feedback on the design if any > >>> and I > >>>>> will > >>>>>>> raise the PR accordingly. > >>>>>>> > >>>>>>> - This operator is for the JIRA ticket raised here > >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 < > https://issues.apache.org/jira/browse/APEXMALHAR-2260> < > >>>>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2260 < > https://issues.apache.org/jira/browse/APEXMALHAR-2260>> > >>>>>>> - The operator embeds a python interpreter in the operator JVM > >>> process > >>>>>>> space and is not external to the JVM. > >>>>>>> - The implementation is proposing the use of Java Embedded Python ( > >>>> JEP > >>>>> ) > >>>>>>> given here https://github.com/ninia/jep < > https://github.com/ninia/jep> < > >>> https://github.com/ninia/jep <https://github.com/ninia/jep> > >>>>> > >>>>>>> - The JEP engine is under zlib/libpng license. Since this is an > >>>> approved > >>>>>>> license under https://www.apache.org/legal/ > resolved.html#category-a > >>> < > >>>>>>> https://www.apache.org/legal/resolved.html#category-a> I am > >>> assuming > >>>> it > >>>>>>> is ok for the community to approve the inclusion of this library > >>>>>>> - Python integration is a messy piece due to the nature of dynamic > >>>>>>> libraries. All python libraries need to be natively installed. This > >>>> also > >>>>>>> means we will not be able bundle python libraries and dependencies > >>> as > >>>>> part > >>>>>>> of the build into the target JVM container. Hence this operator has > >>>> the > >>>>>>> current limitation of the python binaries installed through an > >>>> external > >>>>>>> process on all of the YARN nodes for now. > >>>>>>> - The JEP maven dependency jar in the POM is a JNI wrapper around > >>> the > >>>>>>> dynamic library that is installed externally to the Apex > >>> installation > >>>>>>> process on all of the YARN nodes. > >>>>>>> - Hope to take up https://issues.apache.org/ > >>> jira/browse/APEXCORE-796 > >>>> < > >>>>>>> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this > >>>> issue > >>>>>>> in the future. > >>>>>>> - The python operator implementation can be extended to py4J based > >>>>>>> implementation ( as opposed to in-memory model like JEP ) in the > >>>> future > >>>>> if > >>>>>>> required be. JEP is the implementation based on an in-memory design > >>>>> pattern. > >>>>>>> - The python operator allows for 4 major API patterns > >>>>>>> - Execute a method call by accepting parameters to pass to the > >>>>>>> interpreter > >>>>>>> - Execute a python script as given in a file path > >>>>>>> - Evaluate an expression and allows for passing of variables > >>>> between > >>>>>>> the java code and the python in-memory interpreter bridge > >>>>>>> - A handy method wherein a series of instructions can be passed > >>> in > >>>>> one > >>>>>>> single java call ( executed as a sequence of python eval > >>> instructions > >>>>> under > >>>>>>> the hood ) > >>>>>>> - Automatic garbage collection of the variables that are passed > from > >>>>> java > >>>>>>> code to the in memory python interpreter > >>>>>>> - Support for all major python libraries. Tensorflow, Keras, > Scikit, > >>>>>>> xgboost. Preliminary tests for these libraries seem to work as per > >>>> code > >>>>>>> here : https://github.com/ananthc/sampleapps/tree/master/apache- > >>>>>>> apex/apexjvmpython <https://github.com/ananthc/ > >>>>>>> sampleapps/tree/master/apache-apex/apexjvmpython> > >>>>>>> - The implementation allows for SLA based execution model. i.e. the > >>>>>>> operator is given a chance to execute the python code and if not > >>>>> complete > >>>>>>> within a time out, the operator code returns back null. > >>>>>>> - A tuple that has become a straggler as per previous point will > >>>>>>> automatically be drained off to a different port so that downstream > >>>>>>> operators can still consume the straggler if they want to when the > >>>>> results > >>>>>>> arrive. > >>>>>>> - Because of the nature of python being an interpreter and if a > >>>> previous > >>>>>>> tuple is being still processed, there is chance of a back pressure > >>>>> pattern > >>>>>>> building up very quickly. Hence this operator works on the concept > >>> of > >>>> a > >>>>>>> worker pool. The Python operator uses a configurable number of > >>> worker > >>>>>>> thread each of which embed the Python interpreter within their > >>>>> processing > >>>>>>> space. i.e. it is in fact a collection of python ink memory > >>>> interpreters > >>>>>>> inside the Python operator implementation. > >>>>>>> - The operator chooses one of the threads at runtime basing on > their > >>>>> busy > >>>>>>> state thus allowing for back-pressure issues to be resolved > >>>>> automatically. > >>>>>>> - There is a first class support for Numpy in JEP. Java arrays > would > >>>> be > >>>>>>> convertible to the Python Numpy arrays and vice versa and share the > >>>> same > >>>>>>> memory addresses for efficiency reasons. > >>>>>>> - The base operator implements dynamic partitioning based on a > >>> thread > >>>>>>> starvation policy. At each checkpoint, it checks how much > percentage > >>>> of > >>>>> the > >>>>>>> requests resulted in starved threads and if the starvation exceeds > a > >>>>>>> configured percentage, a new instance of the operator is > provisioned > >>>> for > >>>>>>> every such instance of the operator > >>>>>>> - The operator provides the notion of a worker execution mode. > There > >>>> are > >>>>>>> two worker modes that are passed in each of the above calls from > the > >>>>> user. > >>>>>>> ALL or ANY. Because python interpreter is state based engine, a > >>> newly > >>>>>>> dynamically partitioned operator might not be in the exact state of > >>>> the > >>>>>>> remaining operators. Hence the operator has this notion of worker > >>>>> execution > >>>>>>> mode. Any call ( any of the 4 calls mentioned above ) called with > >>> ALL > >>>>>>> execution mode will be executed on all the workers of the worker > >>>> thread > >>>>>>> pool as well as the dynamically portioned instance whenever such an > >>>>>>> instance is provisioned. > >>>>>>> - The base operator implementation has a method that can be > >>> overridden > >>>>> to > >>>>>>> implement the logic that needs to be executed for each tuple. The > >>> base > >>>>>>> operator default implementation is a simple NO-OP. > >>>>>>> - The operator automatically picks up the least busy of the thread > >>>> pool > >>>>>>> worker which has JEP embedded in it to execute the call. > >>>>>>> - The JEP based installation will not support non Cpython modules. > >>> All > >>>>> of > >>>>>>> the major python libraries are cpython based and hence I believe > >>> this > >>>>> is of > >>>>>>> a lesser concern. If we hit a roadblock when a new python library > >>>> being > >>>>> a > >>>>>>> non-Cpython based library needs to be run, then we could implement > >>> the > >>>>>>> ApexPythonEngine interface to something like Py4J which involves > >>>>>>> interprocess communication. > >>>>>>> - The python operator requires the user to set the library path > >>>>>>> java.library.path for the operator to make use of the dynamic > >>>> libraries > >>>>> of > >>>>>>> the corresponding platform. This has to be passed in as the JVM > >>>> options. > >>>>>>> Failing to do so will result in the operator failing to load the > >>>>>>> interpreter properly. > >>>>>>> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. > >>>> Numpy > >>>>>> = > >>>>>>> 1.7 is supported. > >>>>>>> - There is no support for virtual environments yet. In case of > >>>> multiple > >>>>>>> python versions on the node, to include the right python version > for > >>>> the > >>>>>>> apex operator, ensure that the environment variables and the > dynamic > >>>>>>> library path are set appropriately. This is a workaround and I hope > >>>>>>> APEXCORE-796 will solve this issue as well. > >>>>>>> > >>>>>>> > >>>>>>> Regards, > >>>>>>> Ananth > > > >