Github user zentol commented on the pull request:
https://github.com/apache/incubator-flink/pull/202#issuecomment-63110487
ok, let's see...
A) data transmission.
Related class: Sender, Receiver and Streamer
Sender and Receiver are low-level classes that deal with de-/serialization
and reading/writing to/from the memory-mapped file.
The Streamer class is one level above them, being the class a function will
primarily uses.
For example a map function (implemented as a map partition) would use it
like this:
@Override
public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws
Exception {
streamer.streamBufferWithoutGroups(values.iterator(), out);
}
As such the streamer basically decides what happens at runtime.
It communicates with the external process via udp, exchanging signals when
to read/write buffers.
B) Plan Binding
Related classes: PlanBinder, OperationInfo
The PlanBinder converts data sent from the external process into an actual
flink plan. The sent data has to follow a very specific scheme defined within
the class.
Lets say you have a plan with a single CSV source. In this case for the
sources you would
1. send 1 as an integer, representing the total number of sources
2. send "CSV" denoting the type of source
3. send a unique ID integer
4. send a tuple containing
(filePath,lineDelimiter,fieldDelimiter,type1,...,typeX)
the type arguments are exemplary objects, meaning that if you want to
read Tuple2<Integer,Integer>, type1 and type2 would be integers
These behaviours are completely defined for
parameters (DOP, local execution)
data sources (CSV,TEXT,VALUE (fromElements())
data sinks (CSV,TEXT,PRINT)
broadcast variables
operations are a bit different; but they follow the same pattern:
1. send integer representing the total number of operations
2. send operation identifier
3. send all relevant arguments (this is partially implementee defined)
between step 2(exclusive) and 3(inclusive) the following methods are called:
createOperationInfo(identifier) (abstract)
createXOperation(operationInfo)
applyXOperation(parameter1,..., paremeterX, operationInfo) (abstract, only
for UDF-operations)
createOperationInfo is an abstract method with the purpose of creating an
<? extends operationInfo> object
containing all information necessary to apply an (UDF or non-UDF) operation.
For a python map for example, such an object would contain
1. ID of set to apply map on
2 serialized operator
3. type argument
4. some additional info :>
This object is passed to createXOperation. It extracts certain parameters
from the object based on the identifier(these would be things that you usually
would use for this operation, a join would extract grouping keys among others
for example)
and calls applyXOperation, which actually applies the operation. It was
constructed this way so that the implementee has the freedom to implement
operations however he wants, like a map as a map partition. I was wondering
whether i should remove createXOperation.
To plug in python i had to very little on the java side:
1. create functions (PythonMap, PythonCoGroup etc)
2. extend Streamer and implement setupProcess (starting python process)
3. extend OperationInfo (roughly 160 lines, given 16 operations thats
about 10 each)
4. extend PlanBinder (the abstract methods only took 100 lines)
On the external side you mostly need code to
1. communicate with the stream (Connection.py)
2. deserialize data following 3 different protocols(Iterator.py)
1. <type1><record1><type2><record2>...<typeX><recordX>
2. <type><record1><record2>...<recordX>
3. <type1><type2><record1T1><record2T2><record3T1><record4T2>
3. serialize data following 2 different protocols(Collector.py)
1. <type1><record1><type2><record2>...<typeX><recordX>
2. <type><record1><record2>...<recordX>
4. PlanBinder counterpart, turn API calls into intermediate format and
send it to java
hmm...I'll leave it at that for now. If i keep writing too many in depth
implementation details will slip in ^^
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---