Hey! The UDP version is 25x slower? That's massive. Are you sending the records through that as well, or just the coordination?
Regarding busy waiting loops: There has to be a better way to do that. It will behave utterly unpredictable. Once the python side does I/O, has a separate process or thread or goes asynchronously into a library (scikitlearn, numpy), the loop cannot be expected to stay at 5%. You have tested that with a job where both java and python side have some work to do. In case of a job where one side waits for the other, the waiting side will burn cycles like crazy. Then run it in parallel (#cores) and you may get executions where little more happens then the busy waiting loop burning cycles. Stephan On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < [email protected]> wrote: > sorry for the late answer. > > today i did a quick hack to replace the synchronization completely with > udp. its still synchronous and record based, but 25x slower. > regarding busy-loops i would propose the following: > > 1. leave the python side as it is. its doing most of the heavy lifting > anyway and will run at 100% regardless of the loops. (the loops only > take up 5% of the total runtime) > 2. once we exchange buffers instead of single records the IO operations > and synchronization will take a fairly constant time. we could then > put the java process to sleep manually for that time instead of > waiting. it may not be as good as a blocking operation, but it > should keep the cpu consumption down to some extent. > > > On 1.9.2014 22:50, Ufuk Celebi wrote: > >> Hey Chesnay, >> >> any progress on this today? Are you going for the UDP buffer availability >> notifications Stephan proposed instead of the busy loop? >> >> Ufuk >> >> >> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >> [email protected]> wrote: >> >> the performance differences occur on the same system (16GB, 4 cores + >>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>> operator. >>> plenty of resources :/ >>> >>> >>> On 28.8.2014 0:50, Stephan Ewen wrote: >>> >>> Hey Chesnay! >>>> >>>> Here are some thoughts: >>>> >>>> - The repeated checking for 1 or 0 is indeed a busy loop. These may >>>> behave >>>> very different in different settings. If you run the code isolated, you >>>> have a spare core for the thread and it barely hurts. Run multiple >>>> parallel >>>> instances in a larger framework, and it eats away CPU cycles from the >>>> threads that do the work - it starts hurting badly. >>>> >>>> - You may get around a copy into the shared memory (ByteBuffer into >>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>> more >>>> data copy. That's the next step, though, first solve the other issue. >>>> >>>> The last time I implemented such an inter-process data pipe between >>>> languages, I had a similar issue: No support for system wide semaphores >>>> (or >>>> something similar) on both sides. >>>> >>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>> but >>>> I guess TCP would be fine as well) for notifications when buffers are >>>> available. That worked pretty well, yielded high throughput, because the >>>> big buffers were not copied (unlike in streams), and the UDP >>>> notifications >>>> were very fast (fire and forget datagrams). >>>> >>>> Stephan >>>> >>>> >>>> >>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>> [email protected]> wrote: >>>> >>>> Hey Stephan, >>>> >>>>> I'd like to point out right away that the code related to your >>>>> questions >>>>> is shared by both programs. >>>>> >>>>> regarding your first point: i have a byte[] into which i serialize the >>>>> data first using a ByteBuffer, and then write that data to a >>>>> MappedByteBuffer. >>>>> >>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>> things >>>>> like semaphores or similar that work between python and java alike. >>>>> >>>>> the data exchange is currently completely synchronous. java writes a >>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>> whether >>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>> it >>>>> has read the record and can write the next one. this scheme works the >>>>> same >>>>> way the other way around. >>>>> >>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>> rather >>>>> should be...) way faster (5x) that what we had so far though >>>>> (asynchronous >>>>> pipes). >>>>> (i also tried different schemes that all had no effect, so i decided to >>>>> stick with the easiest one) >>>>> >>>>> on to your last point: I'm gonna check for that tomorrow. >>>>> >>>>> >>>>> >>>>> >>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>> >>>>> Hi Chesnay! >>>>> >>>>>> That is an interesting problem, though hard to judge with the >>>>>> information >>>>>> we have. >>>>>> >>>>>> Can you elaborate a bit on the following points: >>>>>> >>>>>> - When putting the objects from the Java Flink side into the >>>>>> shared >>>>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>>>> then >>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>> >>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>> give >>>>>> you >>>>>> flow control for free (blocking write calls when the stream consumer >>>>>> is >>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>> semaphores, >>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>> memory >>>>>> regions are made available. Can you check if there are some busy >>>>>> waiting >>>>>> parts in you code? >>>>>> >>>>>> - More general: The code is slower, but does it burn CPU cycles in >>>>>> its >>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>> [email protected]> wrote: >>>>>> >>>>>> Hello everyone, >>>>>> >>>>>> This will be some kind of brainstorming question. >>>>>>> >>>>>>> As some of you may know I am currently working on the Python API. The >>>>>>> most >>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>> Python. >>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>> memory >>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>> >>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>> would >>>>>>> yield a significant increase. yet when i added the code to flink and >>>>>>> ran >>>>>>> a >>>>>>> job, there was >>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>> /exactly/ >>>>>>> the same time. >>>>>>> >>>>>>> my conclusion was that code already in place (and not part of the >>>>>>> prototypes) is responsible for this. >>>>>>> so i went ahead and modified the prototypes to use all relevant code >>>>>>> from >>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>> the >>>>>>> performance increase was there. >>>>>>> >>>>>>> Now here's the question: How can the /very same code/ perform so much >>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>> what >>>>>>> could be it? >>>>>>> >>>>>>> i spent a lot of time looking for that one line of code that cripples >>>>>>> the >>>>>>> performance, but I'm pretty much out of places to look. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >
