[ 
https://issues.apache.org/jira/browse/FLINK-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14061134#comment-14061134
 ] 

Daniel Warneke commented on FLINK-1025:
---------------------------------------

I’m not emotionally attached to the current design, so just let me know when I 
shall change something. However, this is the rationale behind the current 
version:

1. The hash sum computation is indeed primarily useful for distributing jar 
files. Since we cannot expect developers to change the names of their jar files 
across code changes, the computation of the hash sum is crucial to guarantee 
the deployment of the correct user code version. Without the hash sum 
computation, we essentially could not locally cache files on the TaskManagers 
anymore.

I guess we could debate the overhead of the hash sum computation for hours if 
we wanted to. I didn’t know you planned to stress the BLOB service so heavily 
that it could become a potential bottleneck. If the service is critical to the 
performance of the system, it could in fact be improved quite a bit. If you 
only plan to push a few MB around, I argue that the overhead is negligible.

2. The decision not to delete BLOBs after job completion is a consequence of 
the content-addressable design. Different jobs might refer to the same file. 
Therefore it is not safe to delete a file after a job is finished, unless you 
start counting references. For now, the BLOB storage is wiped when the 
JobManager/TaskManager is shut down. I thought it was good enough to start with.

3.  I’m perfectly fine with letting the OS chose free ports for the BLOB 
service.

4. For a general InputStream, you cannot assume to know the length of the 
incoming content upfront. Typically, you just read the stream until you get a 
-1. Unfortunately, for the remote side to read a -1, the local side has to 
close the stream. For a socket, however, this operation closes the entire TCP 
connection, so it is not possible for the remote side to respond to the 
received data anymore. Therefore, the current version transmits the length of 
the data that follows per buffer.

5. The singleton implementation is mostly for convenience. There are 
essentially two parts in the code where you would have to pass a reference to 
the BLOB service in a very cumbersome manner otherwise (the JobGraph and 
LibraryCacheManager). The current implementation should work in pseudo clusters 
as long as the JobManager is instantiated before the TaskManager, but I’d be 
open to change the design.


> Improve BLOB Service
> --------------------
>
>                 Key: FLINK-1025
>                 URL: https://issues.apache.org/jira/browse/FLINK-1025
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 0.6-incubating
>            Reporter: Stephan Ewen
>             Fix For: 0.6-incubating
>
>
> I like the idea of making it transparent where the blob service runs, so the 
> code on the server/client side is agnostic to that.
> The current merged code is in 
> https://github.com/StephanEwen/incubator-flink/commits/blobservice
> Local tests pass, I am trying distributed tests now.
> There are a few suggestions for improvements:
>  - Since the all the resources are bound to a job or session, it makes sense 
> to make all puts/gets relative to a jobId (becoming session id) and to have a 
> cleanup hook that delete all resources associated with that job.
>  - The BLOB service has hardwired to compute a message digest for the 
> contents, and to use that as the key. While it may make sense for jar files 
> (cached libraries), for many cases in the future, that will be unnecessary 
> and impose only overhead. I would vote to make this optional and allow just 
> UUIDs for keys. An example is for the taskmanager to put a part of an 
> intermediate result into the blob store, for the client to pick it up.
>  - At most points, we have started moving away from configured ports, because 
> of configuration overhead and collisions in setups, where multiple instances 
> end up on one machine. The latter happens actually frequently with YARN. I 
> would suggest to have the JM open a port dynamically for the BlobService 
> (similar as in TaskManager#getAvailablePort() ). RPC calls to figure out this 
> configuration need to happen only once between client/JM and TM/JM. We can 
> stomach that overhead ;-)
>  - The write method does not write the length a single time, but "per 
> buffer". Why is it done that way? The array-based methods know the length up 
> front, and when the contents comes from an input stream, I think we know the 
> length as well (for files: filesize, for network: sent up front).
>  - I am personally in favor of moving away from static singleton registries. 
> They tend to cause trouble during testing, pseudo cluster modes (multiple 
> workers within one JVM). How hard is it to have a BlobService at the 
> TaskManager / JobManager that we can pass as references to points where it is 
> needed.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to