[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507950#comment-16507950
 ] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6147

    [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files

    ## What is the purpose of the change
    
    This PR reworks the `JobSubmitHandler` to also accept jar/artifact files. 
Previously these files had to be uploaded preemptively to the blob-service by 
the client. With this change the entire job submission goes through REST.
    
    This PR addresses 3 JIRAs in total:
    
    **FLINK-9382**
    Directories given to the blob-service (primarily a use-case for the 
distributed cache) are currently silently zipped, and later unzipped by the 
`FileCache`. This tightly coupled the zipping logic in the blob-service to the 
unzipping logic of the `FileCache`. The blob-service neither unzipped the 
directory if the blob was requested, nor did it provide any means of doing so 
manually, nor did it inform the user as to whether the requested blob is a zip 
or not.
    
    My conclusion in `FLINK-9382` is that the blob-service should not support 
directories _for now_, and that instead directories for the `distributed cache` 
should be explicitly zipped beforehand, given that this is the only use-case we 
have at the moment.
    
    This JIRA is related to FLINK-9280 as the zipping logic was necessary for 
the upload of directories from the client via REST. Since the server thus 
receives all artifacts already in zipped form we can them in zipped form to the 
blob-service , making the blob-service support for directories obsolete.
    
    The zipping is now done in `JobGraph#uploadUserArtifacts` with utilities 
provided by the `FileCache` class. 
    The unzipping is still done by the `FileCache`. Furthermore, we now no 
longer delete the zip after processing, as this file is managed by the 
blob-service.
    
    **FLINK-9500**
    In some cases (I don't know exactly when) it can happen that en empty 
`LastHttpContent` is sent at the end of a FileUpload. This currently leads to 
an exception in the `FileUploadHandler` when calling 
`currentHttpPostRequestDecoder.hasNext()`.
    
    The `LastHttpContent` message is fortunately a singleton, which allows us 
to easily check for it in the `FileUploadHandler`. If detected we skip the 
payload processing.
    Note that we still `offer` this content to the encoder, as this part is 
still handled without exception and appears to follow an expected life-cycle.
    
    This issue was also triggered by FLINK-9280, which now serves as 
verification for the fix.
    
    **FLINK-9280**
    
    This issue is addressed in 5 commits that must be squashed before a merge.
    
    The commit `Move channel setup into utility method` is a simple refactoring 
to allow re-using code.
    The commit `Remove BlobServer port handler` removes various classes related 
to requesting the blobserver port via REST, which is now obsolete.
    The commit `add new constructor for DCEntry` adds another constructor to 
the `DistributedCacheEntry` class for setting the `isZipped` flag on the 
client-side. The documentation was also extended to cover the life-cycle of 
entries for directories.
    
    The last 2 commits contain the actual implementation and are separated by 
client/server.
    
    The following is an outline of the events after 
`RestClusterClient#submitJob` has been called:
    * directories registered for the distributed cache are zipped, and dc 
entries are updated using the newly added constructor
    * the jobgraph, jars and **local** artifacts (dc files) are sent to the 
Dispatcher by the `RestClient` as a multi-part request
      * the jobgraph is contained in a `JobSubmitRequestBody` and stored as an 
Attribute
      * each jar/artifact is stored as a separate `FileUpload`
    * the `FileUploadHandler` receives the request and stores the received 
parts in a `JobSubmitRequestBodyBuffer`. Once the request is fully read the 
buffer is converted into a proper `JobSubmitRequestBody` and passed to the rest 
of the pipeline as an attribute. In other words we inject the paths to uploaded 
jars/artifacts into the submitted `JobSubmitRequestBody`. Unfortunately we are 
also parsing the original json payload here, which ideally should be done by 
the handler for consistency.
    * the modified `JobSubmitRequestBody` is read in 
`AbstractHandler#respondAsLeader`, cast, and passed on in place of the original 
request
    * The `JobSubmitHandler` modifies the JobGraph to no longer refer to 
client-local jars/artifacts, in preparation for the job-submission. Jar entries 
are categorically overridden as jars are always uploaded. Artifacts are only 
overridden if an uploaded file exists, identified by the file name.
    * jars/artifacts are uploaded to the BlobService; this was previously done 
in the ClusterClient
    * job is submitted, as before
    
    ## Verifying this change
    
    FLINK-9382 is covered by added tests (see the relevant commit)a and the 
existing distributed-cache and python E2E tests.
    
    FLINK-9500 is implicitly tested by FLINK-9280.
    
    FLINK-9280:
    * job-submission as a whole is tested by existing E2E tests and 
`RestClusterClientTest`.
    * changes to the `JobSubmitHandler` are covered in `JobSubmitHandlerTest`.
    
    ## Brief change log
    * extend `RestClient` to support sending jobgraph, jars and artifacts as 
multipart http request
    * modify `FileUploadHandler` to handler job-submission specific multipart 
request
    * modify `JobSubmitHandler` to override jar/artifact entries pointing to 
client-local files to instead point to uploaded files
    * move jar/artifact blob-service upload logic from `RestClusterClient` to 
`JobSubmitHandler`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9280

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6147.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6147
    
----
commit 45bb07aac047b52675857ab0f18848aa3c20d42d
Author: zentol <chesnay@...>
Date:   2018-06-04T11:50:44Z

    [FLINK-9382][dc] Consolidate zipping logic in FileCache

commit 9fdc0d9c47abff314068a8f7c04952ff6775dea9
Author: zentol <chesnay@...>
Date:   2018-06-04T12:13:35Z

    [FLINK-9500][rest] Properly handle EmptyLastHttpContent

commit 8cb0de747931c10c60bc756925c697c222201e5b
Author: zentol <chesnay@...>
Date:   2018-06-04T12:39:53Z

    [FLINK-9280][rest] Move channel setup into untility method

commit 3418eb6aa65249fed6d1530b6a9699b68da1a7f7
Author: zentol <chesnay@...>
Date:   2018-06-11T09:45:12Z

    [FLINK-9280][rest] Remove BlobServer port handler

commit 8917004634f074e48af3024633ab6c5e3a294e4f
Author: zentol <chesnay@...>
Date:   2018-06-04T12:00:21Z

    [FLINK-9280][rest] add new constructor for DCEntry

commit 1bea8e6dfee1586c9dcc4e1442e8389359eb075a
Author: zentol <chesnay@...>
Date:   2018-06-04T12:55:19Z

    [FLINK-9280][rest] client modifications

commit 42db1bffeb7d8d49f0a0c0280f9c3110b8587dc1
Author: zentol <chesnay@...>
Date:   2018-06-04T13:43:11Z

    [FLINK-9280][rest] server modifications

----


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to