[
https://issues.apache.org/jira/browse/FLINK-377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14212616#comment-14212616
]
ASF GitHub Bot commented on FLINK-377:
--------------------------------------
Github user zentol commented on the pull request:
https://github.com/apache/incubator-flink/pull/202#issuecomment-63110487
ok, let's see...
A) data transmission.
Related class: Sender, Receiver and Streamer
Sender and Receiver are low-level classes that deal with de-/serialization
and reading/writing to/from the memory-mapped file.
The Streamer class is one level above them, being the class a function will
primarily uses.
For example a map function (implemented as a map partition) would use it
like this:
@Override
public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws
Exception {
streamer.streamBufferWithoutGroups(values.iterator(), out);
}
As such the streamer basically decides what happens at runtime.
It communicates with the external process via udp, exchanging signals when
to read/write buffers.
B) Plan Binding
Related classes: PlanBinder, OperationInfo
The PlanBinder converts data sent from the external process into an actual
flink plan. The sent data has to follow a very specific scheme defined within
the class.
Lets say you have a plan with a single CSV source. In this case for the
sources you would
1. send 1 as an integer, representing the total number of sources
2. send "CSV" denoting the type of source
3. send a unique ID integer
4. send a tuple containing
(filePath,lineDelimiter,fieldDelimiter,type1,...,typeX)
the type arguments are exemplary objects, meaning that if you want to
read Tuple2<Integer,Integer>, type1 and type2 would be integers
These behaviours are completely defined for
parameters (DOP, local execution)
data sources (CSV,TEXT,VALUE (fromElements())
data sinks (CSV,TEXT,PRINT)
broadcast variables
operations are a bit different; but they follow the same pattern:
1. send integer representing the total number of operations
2. send operation identifier
3. send all relevant arguments (this is partially implementee defined)
between step 2(exclusive) and 3(inclusive) the following methods are called:
createOperationInfo(identifier) (abstract)
createXOperation(operationInfo)
applyXOperation(parameter1,..., paremeterX, operationInfo) (abstract, only
for UDF-operations)
createOperationInfo is an abstract method with the purpose of creating an
<? extends operationInfo> object
containing all information necessary to apply an (UDF or non-UDF) operation.
For a python map for example, such an object would contain
1. ID of set to apply map on
2 serialized operator
3. type argument
4. some additional info :>
This object is passed to createXOperation. It extracts certain parameters
from the object based on the identifier(these would be things that you usually
would use for this operation, a join would extract grouping keys among others
for example)
and calls applyXOperation, which actually applies the operation. It was
constructed this way so that the implementee has the freedom to implement
operations however he wants, like a map as a map partition. I was wondering
whether i should remove createXOperation.
To plug in python i had to very little on the java side:
1. create functions (PythonMap, PythonCoGroup etc)
2. extend Streamer and implement setupProcess (starting python process)
3. extend OperationInfo (roughly 160 lines, given 16 operations thats
about 10 each)
4. extend PlanBinder (the abstract methods only took 100 lines)
On the external side you mostly need code to
1. communicate with the stream (Connection.py)
2. deserialize data following 3 different protocols(Iterator.py)
1. <type1><record1><type2><record2>...<typeX><recordX>
2. <type><record1><record2>...<recordX>
3. <type1><type2><record1T1><record2T2><record3T1><record4T2>
3. serialize data following 2 different protocols(Collector.py)
1. <type1><record1><type2><record2>...<typeX><recordX>
2. <type><record1><record2>...<recordX>
4. PlanBinder counterpart, turn API calls into intermediate format and
send it to java
hmm...I'll leave it at that for now. If i keep writing too many in depth
implementation details will slip in ^^
> Create a general purpose framework for language bindings
> --------------------------------------------------------
>
> Key: FLINK-377
> URL: https://issues.apache.org/jira/browse/FLINK-377
> Project: Flink
> Issue Type: Improvement
> Reporter: GitHub Import
> Labels: github-import
> Fix For: pre-apache
>
>
> A general purpose API to run operators with arbitrary binaries.
> This will allow to run Stratosphere programs written in Python, JavaScript,
> Ruby, Go or whatever you like.
> We suggest using Google Protocol Buffers for data serialization. This is the
> list of languages that currently support ProtoBuf:
> https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns
> Very early prototype with python:
> https://github.com/rmetzger/scratch/tree/learn-protobuf (basically testing
> protobuf)
> For Ruby: https://github.com/infochimps-labs/wukong
> Two new students working at Stratosphere (@skunert and @filiphaase) are
> working on this.
> The reference binding language will be for Python, but other bindings are
> very welcome.
> The best name for this so far is "stratosphere-lang-bindings".
> I created this issue to track the progress (and give everybody a chance to
> comment on this)
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/377
> Created by: [rmetzger|https://github.com/rmetzger]
> Labels: enhancement,
> Assignee: [filiphaase|https://github.com/filiphaase]
> Created at: Tue Jan 07 19:47:20 CET 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)