[
https://issues.apache.org/jira/browse/FLINK-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14060858#comment-14060858
]
Stephan Ewen commented on FLINK-1025:
-------------------------------------
The current BLOB service also seems not to work properly:
{code}
java.io.IOException: Detected data corruption during transfer
at
org.apache.flink.runtime.blobservice.ProxyImpl.fetchFromServer(ProxyImpl.java:124)
at
org.apache.flink.runtime.blobservice.ProxyImpl.getURL(ProxyImpl.java:151)
at
org.apache.flink.runtime.blobservice.BlobService.getURL(BlobService.java:531)
at
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager.registerInternal(LibraryCacheManager.java:169)
at
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager.register(LibraryCacheManager.java:131)
at
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.read(TaskDeploymentDescriptor.java:260)
at
org.apache.flink.runtime.util.SerializableArrayList.read(SerializableArrayList.java:106)
at org.apache.flink.runtime.ipc.RPC$Invocation.read(RPC.java:141)
at
org.apache.flink.runtime.ipc.Server$Connection.processData(Server.java:904)
at
org.apache.flink.runtime.ipc.Server$Connection.readAndProcess(Server.java:865)
at org.apache.flink.runtime.ipc.Server$Listener.doRead(Server.java:457)
at org.apache.flink.runtime.ipc.Server$Listener.run(Server.java:360)
{code}
> Improve BLOB Service
> --------------------
>
> Key: FLINK-1025
> URL: https://issues.apache.org/jira/browse/FLINK-1025
> Project: Flink
> Issue Type: Improvement
> Components: JobManager
> Affects Versions: 0.6-incubating
> Reporter: Stephan Ewen
> Fix For: 0.6-incubating
>
>
> I like the idea of making it transparent where the blob service runs, so the
> code on the server/client side is agnostic to that.
> The current merged code is in
> https://github.com/StephanEwen/incubator-flink/commits/blobservice
> Local tests pass, I am trying distributed tests now.
> There are a few suggestions for improvements:
> - Since the all the resources are bound to a job or session, it makes sense
> to make all puts/gets relative to a jobId (becoming session id) and to have a
> cleanup hook that delete all resources associated with that job.
> - The BLOB service has hardwired to compute a message digest for the
> contents, and to use that as the key. While it may make sense for jar files
> (cached libraries), for many cases in the future, that will be unnecessary
> and impose only overhead. I would vote to make this optional and allow just
> UUIDs for keys. An example is for the taskmanager to put a part of an
> intermediate result into the blob store, for the client to pick it up.
> - At most points, we have started moving away from configured ports, because
> of configuration overhead and collisions in setups, where multiple instances
> end up on one machine. The latter happens actually frequently with YARN. I
> would suggest to have the JM open a port dynamically for the BlobService
> (similar as in TaskManager#getAvailablePort() ). RPC calls to figure out this
> configuration need to happen only once between client/JM and TM/JM. We can
> stomach that overhead ;-)
> - The write method does not write the length a single time, but "per
> buffer". Why is it done that way? The array-based methods know the length up
> front, and when the contents comes from an input stream, I think we know the
> length as well (for files: filesize, for network: sent up front).
> - I am personally in favor of moving away from static singleton registries.
> They tend to cause trouble during testing, pseudo cluster modes (multiple
> workers within one JVM). How hard is it to have a BlobService at the
> TaskManager / JobManager that we can pass as references to points where it is
> needed.
--
This message was sent by Atlassian JIRA
(v6.2#6252)