Hey Chesnay,

any progress on this today? Are you going for the UDP buffer availability
notifications Stephan proposed instead of the busy loop?

Ufuk


On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
[email protected]> wrote:

> the performance differences occur on the same system (16GB, 4 cores +
> HyperThreading) with a DOP of 1 for a plan consisting of a single operator.
> plenty of resources :/
>
>
> On 28.8.2014 0:50, Stephan Ewen wrote:
>
>> Hey Chesnay!
>>
>> Here are some thoughts:
>>
>>   - The repeated checking for 1 or 0 is indeed a busy loop. These may
>> behave
>> very different in different settings. If you run the code isolated, you
>> have a spare core for the thread and it barely hurts. Run multiple
>> parallel
>> instances in a larger framework, and it eats away CPU cycles from the
>> threads that do the work - it starts hurting badly.
>>
>>   - You may get around a copy into the shared memory (ByteBuffer into
>> MemoryMappedFile) by creating an according DataOutputView - save one more
>> data copy. That's the next step, though, first solve the other issue.
>>
>> The last time I implemented such an inter-process data pipe between
>> languages, I had a similar issue: No support for system wide semaphores
>> (or
>> something similar) on both sides.
>>
>> I used Shared memory for the buffers, and a local network socket (UDP, but
>> I guess TCP would be fine as well) for notifications when buffers are
>> available. That worked pretty well, yielded high throughput, because the
>> big buffers were not copied (unlike in streams), and the UDP notifications
>> were very fast (fire and forget datagrams).
>>
>> Stephan
>>
>>
>>
>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>> [email protected]> wrote:
>>
>>  Hey Stephan,
>>>
>>> I'd like to point out right away that the code related to your questions
>>> is shared by both programs.
>>>
>>> regarding your first point: i have a byte[] into which i serialize the
>>> data first using a ByteBuffer, and then write that data to a
>>> MappedByteBuffer.
>>>
>>> regarding synchronization: i couldn't find a way to use elaborate things
>>> like semaphores or similar that work between python and java alike.
>>>
>>> the data exchange is currently completely synchronous. java writes a
>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>> whether
>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>> happens, it reads the record, sets the bit to 0 which tells java that it
>>> has read the record and can write the next one. this scheme works the
>>> same
>>> way the other way around.
>>>
>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>> rather
>>> should be...) way faster (5x) that what we had so far though
>>> (asynchronous
>>> pipes).
>>> (i also tried different schemes that all had no effect, so i decided to
>>> stick with the easiest one)
>>>
>>> on to your last point: I'm gonna check for that tomorrow.
>>>
>>>
>>>
>>>
>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>
>>>  Hi Chesnay!
>>>>
>>>> That is an interesting problem, though hard to judge with the
>>>> information
>>>> we have.
>>>>
>>>> Can you elaborate a bit on the following points:
>>>>
>>>>    - When putting the objects from the Java Flink side into the shared
>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>> then
>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>
>>>>    - Shared memory access has to be somehow controlled. The pipes give
>>>> you
>>>> flow control for free (blocking write calls when the stream consumer is
>>>> busy). What do you do for the shared memory? Usually, one uses
>>>> semaphores,
>>>> or, in java File(Range)Locks to coordinate access and block until memory
>>>> regions are made available. Can you check if there are some busy waiting
>>>> parts in you code?
>>>>
>>>>    - More general: The code is slower, but does it burn CPU cycles in
>>>> its
>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>> [email protected]> wrote:
>>>>
>>>>   Hello everyone,
>>>>
>>>>> This will be some kind of brainstorming question.
>>>>>
>>>>> As some of you may know I am currently working on the Python API. The
>>>>> most
>>>>> crucial part here is how the data is exchanged between Java and Python.
>>>>> Up to this point we used pipes for this, but switched recently to
>>>>> memory
>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>
>>>>> Early (simplified) prototypes (outside of Flink) showed that this would
>>>>> yield a significant increase. yet when i added the code to flink and
>>>>> ran
>>>>> a
>>>>> job, there was
>>>>> no effect. like at all. two radically different schemes ran in
>>>>> /exactly/
>>>>> the same time.
>>>>>
>>>>> my conclusion was that code already in place (and not part of the
>>>>> prototypes) is responsible for this.
>>>>> so i went ahead and modified the prototypes to use all relevant code
>>>>> from
>>>>> the Python API in order to narrow down the culprit. but this time, the
>>>>> performance increase was there.
>>>>>
>>>>> Now here's the question: How can the /very same code/ perform so much
>>>>> worse when integrated into flink? if the code is not the problem, what
>>>>> could be it?
>>>>>
>>>>> i spent a lot of time looking for that one line of code that cripples
>>>>> the
>>>>> performance, but I'm pretty much out of places to look.
>>>>>
>>>>>
>>>>>
>>>>>
>

Reply via email to