Hey Chesnay, any progress on this today? Are you going for the UDP buffer availability notifications Stephan proposed instead of the busy loop?
Ufuk On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < [email protected]> wrote: > the performance differences occur on the same system (16GB, 4 cores + > HyperThreading) with a DOP of 1 for a plan consisting of a single operator. > plenty of resources :/ > > > On 28.8.2014 0:50, Stephan Ewen wrote: > >> Hey Chesnay! >> >> Here are some thoughts: >> >> - The repeated checking for 1 or 0 is indeed a busy loop. These may >> behave >> very different in different settings. If you run the code isolated, you >> have a spare core for the thread and it barely hurts. Run multiple >> parallel >> instances in a larger framework, and it eats away CPU cycles from the >> threads that do the work - it starts hurting badly. >> >> - You may get around a copy into the shared memory (ByteBuffer into >> MemoryMappedFile) by creating an according DataOutputView - save one more >> data copy. That's the next step, though, first solve the other issue. >> >> The last time I implemented such an inter-process data pipe between >> languages, I had a similar issue: No support for system wide semaphores >> (or >> something similar) on both sides. >> >> I used Shared memory for the buffers, and a local network socket (UDP, but >> I guess TCP would be fine as well) for notifications when buffers are >> available. That worked pretty well, yielded high throughput, because the >> big buffers were not copied (unlike in streams), and the UDP notifications >> were very fast (fire and forget datagrams). >> >> Stephan >> >> >> >> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >> [email protected]> wrote: >> >> Hey Stephan, >>> >>> I'd like to point out right away that the code related to your questions >>> is shared by both programs. >>> >>> regarding your first point: i have a byte[] into which i serialize the >>> data first using a ByteBuffer, and then write that data to a >>> MappedByteBuffer. >>> >>> regarding synchronization: i couldn't find a way to use elaborate things >>> like semaphores or similar that work between python and java alike. >>> >>> the data exchange is currently completely synchronous. java writes a >>> record, sets an "isWritten" bit and then repeatedly checks this bit >>> whether >>> it is 0. python repeatedly checks this bit whether it is 1. once that >>> happens, it reads the record, sets the bit to 0 which tells java that it >>> has read the record and can write the next one. this scheme works the >>> same >>> way the other way around. >>> >>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>> rather >>> should be...) way faster (5x) that what we had so far though >>> (asynchronous >>> pipes). >>> (i also tried different schemes that all had no effect, so i decided to >>> stick with the easiest one) >>> >>> on to your last point: I'm gonna check for that tomorrow. >>> >>> >>> >>> >>> On 27.8.2014 20:45, Stephan Ewen wrote: >>> >>> Hi Chesnay! >>>> >>>> That is an interesting problem, though hard to judge with the >>>> information >>>> we have. >>>> >>>> Can you elaborate a bit on the following points: >>>> >>>> - When putting the objects from the Java Flink side into the shared >>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>> then >>>> copy that into the shared memory ByteBuffer? Directly? >>>> >>>> - Shared memory access has to be somehow controlled. The pipes give >>>> you >>>> flow control for free (blocking write calls when the stream consumer is >>>> busy). What do you do for the shared memory? Usually, one uses >>>> semaphores, >>>> or, in java File(Range)Locks to coordinate access and block until memory >>>> regions are made available. Can you check if there are some busy waiting >>>> parts in you code? >>>> >>>> - More general: The code is slower, but does it burn CPU cycles in >>>> its >>>> slowness or is it waiting for locks / monitors / conditions ? >>>> >>>> Stephan >>>> >>>> >>>> >>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>> [email protected]> wrote: >>>> >>>> Hello everyone, >>>> >>>>> This will be some kind of brainstorming question. >>>>> >>>>> As some of you may know I am currently working on the Python API. The >>>>> most >>>>> crucial part here is how the data is exchanged between Java and Python. >>>>> Up to this point we used pipes for this, but switched recently to >>>>> memory >>>>> mapped files in hopes of increasing the (lacking) performance. >>>>> >>>>> Early (simplified) prototypes (outside of Flink) showed that this would >>>>> yield a significant increase. yet when i added the code to flink and >>>>> ran >>>>> a >>>>> job, there was >>>>> no effect. like at all. two radically different schemes ran in >>>>> /exactly/ >>>>> the same time. >>>>> >>>>> my conclusion was that code already in place (and not part of the >>>>> prototypes) is responsible for this. >>>>> so i went ahead and modified the prototypes to use all relevant code >>>>> from >>>>> the Python API in order to narrow down the culprit. but this time, the >>>>> performance increase was there. >>>>> >>>>> Now here's the question: How can the /very same code/ perform so much >>>>> worse when integrated into flink? if the code is not the problem, what >>>>> could be it? >>>>> >>>>> i spent a lot of time looking for that one line of code that cripples >>>>> the >>>>> performance, but I'm pretty much out of places to look. >>>>> >>>>> >>>>> >>>>> >
