the performance differences occur on the same system (16GB, 4 cores +
HyperThreading) with a DOP of 1 for a plan consisting of a single
operator. plenty of resources :/
On 28.8.2014 0:50, Stephan Ewen wrote:
Hey Chesnay!
Here are some thoughts:
- The repeated checking for 1 or 0 is indeed a busy loop. These may behave
very different in different settings. If you run the code isolated, you
have a spare core for the thread and it barely hurts. Run multiple parallel
instances in a larger framework, and it eats away CPU cycles from the
threads that do the work - it starts hurting badly.
- You may get around a copy into the shared memory (ByteBuffer into
MemoryMappedFile) by creating an according DataOutputView - save one more
data copy. That's the next step, though, first solve the other issue.
The last time I implemented such an inter-process data pipe between
languages, I had a similar issue: No support for system wide semaphores (or
something similar) on both sides.
I used Shared memory for the buffers, and a local network socket (UDP, but
I guess TCP would be fine as well) for notifications when buffers are
available. That worked pretty well, yielded high throughput, because the
big buffers were not copied (unlike in streams), and the UDP notifications
were very fast (fire and forget datagrams).
Stephan
On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
[email protected]> wrote:
Hey Stephan,
I'd like to point out right away that the code related to your questions
is shared by both programs.
regarding your first point: i have a byte[] into which i serialize the
data first using a ByteBuffer, and then write that data to a
MappedByteBuffer.
regarding synchronization: i couldn't find a way to use elaborate things
like semaphores or similar that work between python and java alike.
the data exchange is currently completely synchronous. java writes a
record, sets an "isWritten" bit and then repeatedly checks this bit whether
it is 0. python repeatedly checks this bit whether it is 1. once that
happens, it reads the record, sets the bit to 0 which tells java that it
has read the record and can write the next one. this scheme works the same
way the other way around.
*NOW,* this may seem ... inefficient, to put it slightly. it is (or rather
should be...) way faster (5x) that what we had so far though (asynchronous
pipes).
(i also tried different schemes that all had no effect, so i decided to
stick with the easiest one)
on to your last point: I'm gonna check for that tomorrow.
On 27.8.2014 20:45, Stephan Ewen wrote:
Hi Chesnay!
That is an interesting problem, though hard to judge with the information
we have.
Can you elaborate a bit on the following points:
- When putting the objects from the Java Flink side into the shared
memory, you need to serialize them. How do you do that? Into a buffer,
then
copy that into the shared memory ByteBuffer? Directly?
- Shared memory access has to be somehow controlled. The pipes give you
flow control for free (blocking write calls when the stream consumer is
busy). What do you do for the shared memory? Usually, one uses semaphores,
or, in java File(Range)Locks to coordinate access and block until memory
regions are made available. Can you check if there are some busy waiting
parts in you code?
- More general: The code is slower, but does it burn CPU cycles in its
slowness or is it waiting for locks / monitors / conditions ?
Stephan
On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
[email protected]> wrote:
Hello everyone,
This will be some kind of brainstorming question.
As some of you may know I am currently working on the Python API. The
most
crucial part here is how the data is exchanged between Java and Python.
Up to this point we used pipes for this, but switched recently to memory
mapped files in hopes of increasing the (lacking) performance.
Early (simplified) prototypes (outside of Flink) showed that this would
yield a significant increase. yet when i added the code to flink and ran
a
job, there was
no effect. like at all. two radically different schemes ran in /exactly/
the same time.
my conclusion was that code already in place (and not part of the
prototypes) is responsible for this.
so i went ahead and modified the prototypes to use all relevant code from
the Python API in order to narrow down the culprit. but this time, the
performance increase was there.
Now here's the question: How can the /very same code/ perform so much
worse when integrated into flink? if the code is not the problem, what
could be it?
i spent a lot of time looking for that one line of code that cripples the
performance, but I'm pretty much out of places to look.