Maybe there is some quirk in the way you use the datagrams. Have you tried it through TCP sockets?
On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler < [email protected]> wrote: > only the coordination is done via UDP. > > i agree with what you say about the loops; currently looking into using > FileLocks. > > > On 9.9.2014 11:33, Stephan Ewen wrote: > >> Hey! >> >> The UDP version is 25x slower? That's massive. Are you sending the records >> through that as well, or just the coordination? >> >> Regarding busy waiting loops: There has to be a better way to do that. It >> will behave utterly unpredictable. Once the python side does I/O, has a >> separate process or thread or goes asynchronously into a library >> (scikitlearn, numpy), the loop cannot be expected to stay at 5%. >> >> You have tested that with a job where both java and python side have some >> work to do. In case of a job where one side waits for the other, the >> waiting side will burn cycles like crazy. Then run it in parallel (#cores) >> and you may get executions where little more happens then the busy waiting >> loop burning cycles. >> >> Stephan >> >> >> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < >> [email protected]> wrote: >> >> sorry for the late answer. >>> >>> today i did a quick hack to replace the synchronization completely with >>> udp. its still synchronous and record based, but 25x slower. >>> regarding busy-loops i would propose the following: >>> >>> 1. leave the python side as it is. its doing most of the heavy lifting >>> anyway and will run at 100% regardless of the loops. (the loops only >>> take up 5% of the total runtime) >>> 2. once we exchange buffers instead of single records the IO operations >>> and synchronization will take a fairly constant time. we could then >>> put the java process to sleep manually for that time instead of >>> waiting. it may not be as good as a blocking operation, but it >>> should keep the cpu consumption down to some extent. >>> >>> >>> On 1.9.2014 22:50, Ufuk Celebi wrote: >>> >>> Hey Chesnay, >>>> >>>> any progress on this today? Are you going for the UDP buffer >>>> availability >>>> notifications Stephan proposed instead of the busy loop? >>>> >>>> Ufuk >>>> >>>> >>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >>>> [email protected]> wrote: >>>> >>>> the performance differences occur on the same system (16GB, 4 cores + >>>> >>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>>>> operator. >>>>> plenty of resources :/ >>>>> >>>>> >>>>> On 28.8.2014 0:50, Stephan Ewen wrote: >>>>> >>>>> Hey Chesnay! >>>>> >>>>>> Here are some thoughts: >>>>>> >>>>>> - The repeated checking for 1 or 0 is indeed a busy loop. These >>>>>> may >>>>>> behave >>>>>> very different in different settings. If you run the code isolated, >>>>>> you >>>>>> have a spare core for the thread and it barely hurts. Run multiple >>>>>> parallel >>>>>> instances in a larger framework, and it eats away CPU cycles from the >>>>>> threads that do the work - it starts hurting badly. >>>>>> >>>>>> - You may get around a copy into the shared memory (ByteBuffer >>>>>> into >>>>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>>>> more >>>>>> data copy. That's the next step, though, first solve the other issue. >>>>>> >>>>>> The last time I implemented such an inter-process data pipe between >>>>>> languages, I had a similar issue: No support for system wide >>>>>> semaphores >>>>>> (or >>>>>> something similar) on both sides. >>>>>> >>>>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>>>> but >>>>>> I guess TCP would be fine as well) for notifications when buffers are >>>>>> available. That worked pretty well, yielded high throughput, because >>>>>> the >>>>>> big buffers were not copied (unlike in streams), and the UDP >>>>>> notifications >>>>>> were very fast (fire and forget datagrams). >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>>>> [email protected]> wrote: >>>>>> >>>>>> Hey Stephan, >>>>>> >>>>>> I'd like to point out right away that the code related to your >>>>>>> questions >>>>>>> is shared by both programs. >>>>>>> >>>>>>> regarding your first point: i have a byte[] into which i serialize >>>>>>> the >>>>>>> data first using a ByteBuffer, and then write that data to a >>>>>>> MappedByteBuffer. >>>>>>> >>>>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>>>> things >>>>>>> like semaphores or similar that work between python and java alike. >>>>>>> >>>>>>> the data exchange is currently completely synchronous. java writes a >>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>>>> whether >>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>>>> it >>>>>>> has read the record and can write the next one. this scheme works the >>>>>>> same >>>>>>> way the other way around. >>>>>>> >>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>>>> rather >>>>>>> should be...) way faster (5x) that what we had so far though >>>>>>> (asynchronous >>>>>>> pipes). >>>>>>> (i also tried different schemes that all had no effect, so i decided >>>>>>> to >>>>>>> stick with the easiest one) >>>>>>> >>>>>>> on to your last point: I'm gonna check for that tomorrow. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>>>> >>>>>>> Hi Chesnay! >>>>>>> >>>>>>> That is an interesting problem, though hard to judge with the >>>>>>>> information >>>>>>>> we have. >>>>>>>> >>>>>>>> Can you elaborate a bit on the following points: >>>>>>>> >>>>>>>> - When putting the objects from the Java Flink side into the >>>>>>>> shared >>>>>>>> memory, you need to serialize them. How do you do that? Into a >>>>>>>> buffer, >>>>>>>> then >>>>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>>>> >>>>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>>>> give >>>>>>>> you >>>>>>>> flow control for free (blocking write calls when the stream consumer >>>>>>>> is >>>>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>>>> semaphores, >>>>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>>>> memory >>>>>>>> regions are made available. Can you check if there are some busy >>>>>>>> waiting >>>>>>>> parts in you code? >>>>>>>> >>>>>>>> - More general: The code is slower, but does it burn CPU >>>>>>>> cycles in >>>>>>>> its >>>>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>>>> >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> Hello everyone, >>>>>>>> >>>>>>>> This will be some kind of brainstorming question. >>>>>>>> >>>>>>>>> As some of you may know I am currently working on the Python API. >>>>>>>>> The >>>>>>>>> most >>>>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>>>> Python. >>>>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>>>> memory >>>>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>>>> >>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>>>> would >>>>>>>>> yield a significant increase. yet when i added the code to flink >>>>>>>>> and >>>>>>>>> ran >>>>>>>>> a >>>>>>>>> job, there was >>>>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>>>> /exactly/ >>>>>>>>> the same time. >>>>>>>>> >>>>>>>>> my conclusion was that code already in place (and not part of the >>>>>>>>> prototypes) is responsible for this. >>>>>>>>> so i went ahead and modified the prototypes to use all relevant >>>>>>>>> code >>>>>>>>> from >>>>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>>>> the >>>>>>>>> performance increase was there. >>>>>>>>> >>>>>>>>> Now here's the question: How can the /very same code/ perform so >>>>>>>>> much >>>>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>>>> what >>>>>>>>> could be it? >>>>>>>>> >>>>>>>>> i spent a lot of time looking for that one line of code that >>>>>>>>> cripples >>>>>>>>> the >>>>>>>>> performance, but I'm pretty much out of places to look. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >
