GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/6147
[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files
## What is the purpose of the change
This PR reworks the `JobSubmitHandler` to also accept jar/artifact files.
Previously these files had to be uploaded preemptively to the blob-service by
the client. With this change the entire job submission goes through REST.
This PR addresses 3 JIRAs in total:
**FLINK-9382**
Directories given to the blob-service (primarily a use-case for the
distributed cache) are currently silently zipped, and later unzipped by the
`FileCache`. This tightly coupled the zipping logic in the blob-service to the
unzipping logic of the `FileCache`. The blob-service neither unzipped the
directory if the blob was requested, nor did it provide any means of doing so
manually, nor did it inform the user as to whether the requested blob is a zip
or not.
My conclusion in `FLINK-9382` is that the blob-service should not support
directories _for now_, and that instead directories for the `distributed cache`
should be explicitly zipped beforehand, given that this is the only use-case we
have at the moment.
This JIRA is related to FLINK-9280 as the zipping logic was necessary for
the upload of directories from the client via REST. Since the server thus
receives all artifacts already in zipped form we can them in zipped form to the
blob-service , making the blob-service support for directories obsolete.
The zipping is now done in `JobGraph#uploadUserArtifacts` with utilities
provided by the `FileCache` class.
The unzipping is still done by the `FileCache`. Furthermore, we now no
longer delete the zip after processing, as this file is managed by the
blob-service.
**FLINK-9500**
In some cases (I don't know exactly when) it can happen that en empty
`LastHttpContent` is sent at the end of a FileUpload. This currently leads to
an exception in the `FileUploadHandler` when calling
`currentHttpPostRequestDecoder.hasNext()`.
The `LastHttpContent` message is fortunately a singleton, which allows us
to easily check for it in the `FileUploadHandler`. If detected we skip the
payload processing.
Note that we still `offer` this content to the encoder, as this part is
still handled without exception and appears to follow an expected life-cycle.
This issue was also triggered by FLINK-9280, which now serves as
verification for the fix.
**FLINK-9280**
This issue is addressed in 5 commits that must be squashed before a merge.
The commit `Move channel setup into utility method` is a simple refactoring
to allow re-using code.
The commit `Remove BlobServer port handler` removes various classes related
to requesting the blobserver port via REST, which is now obsolete.
The commit `add new constructor for DCEntry` adds another constructor to
the `DistributedCacheEntry` class for setting the `isZipped` flag on the
client-side. The documentation was also extended to cover the life-cycle of
entries for directories.
The last 2 commits contain the actual implementation and are separated by
client/server.
The following is an outline of the events after
`RestClusterClient#submitJob` has been called:
* directories registered for the distributed cache are zipped, and dc
entries are updated using the newly added constructor
* the jobgraph, jars and **local** artifacts (dc files) are sent to the
Dispatcher by the `RestClient` as a multi-part request
* the jobgraph is contained in a `JobSubmitRequestBody` and stored as an
Attribute
* each jar/artifact is stored as a separate `FileUpload`
* the `FileUploadHandler` receives the request and stores the received
parts in a `JobSubmitRequestBodyBuffer`. Once the request is fully read the
buffer is converted into a proper `JobSubmitRequestBody` and passed to the rest
of the pipeline as an attribute. In other words we inject the paths to uploaded
jars/artifacts into the submitted `JobSubmitRequestBody`. Unfortunately we are
also parsing the original json payload here, which ideally should be done by
the handler for consistency.
* the modified `JobSubmitRequestBody` is read in
`AbstractHandler#respondAsLeader`, cast, and passed on in place of the original
request
* The `JobSubmitHandler` modifies the JobGraph to no longer refer to
client-local jars/artifacts, in preparation for the job-submission. Jar entries
are categorically overridden as jars are always uploaded. Artifacts are only
overridden if an uploaded file exists, identified by the file name.
* jars/artifacts are uploaded to the BlobService; this was previously done
in the ClusterClient
* job is submitted, as before
## Verifying this change
FLINK-9382 is covered by added tests (see the relevant commit)a and the
existing distributed-cache and python E2E tests.
FLINK-9500 is implicitly tested by FLINK-9280.
FLINK-9280:
* job-submission as a whole is tested by existing E2E tests and
`RestClusterClientTest`.
* changes to the `JobSubmitHandler` are covered in `JobSubmitHandlerTest`.
## Brief change log
* extend `RestClient` to support sending jobgraph, jars and artifacts as
multipart http request
* modify `FileUploadHandler` to handler job-submission specific multipart
request
* modify `JobSubmitHandler` to override jar/artifact entries pointing to
client-local files to instead point to uploaded files
* move jar/artifact blob-service upload logic from `RestClusterClient` to
`JobSubmitHandler`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 9280
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6147.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6147
----
commit 45bb07aac047b52675857ab0f18848aa3c20d42d
Author: zentol <chesnay@...>
Date: 2018-06-04T11:50:44Z
[FLINK-9382][dc] Consolidate zipping logic in FileCache
commit 9fdc0d9c47abff314068a8f7c04952ff6775dea9
Author: zentol <chesnay@...>
Date: 2018-06-04T12:13:35Z
[FLINK-9500][rest] Properly handle EmptyLastHttpContent
commit 8cb0de747931c10c60bc756925c697c222201e5b
Author: zentol <chesnay@...>
Date: 2018-06-04T12:39:53Z
[FLINK-9280][rest] Move channel setup into untility method
commit 3418eb6aa65249fed6d1530b6a9699b68da1a7f7
Author: zentol <chesnay@...>
Date: 2018-06-11T09:45:12Z
[FLINK-9280][rest] Remove BlobServer port handler
commit 8917004634f074e48af3024633ab6c5e3a294e4f
Author: zentol <chesnay@...>
Date: 2018-06-04T12:00:21Z
[FLINK-9280][rest] add new constructor for DCEntry
commit 1bea8e6dfee1586c9dcc4e1442e8389359eb075a
Author: zentol <chesnay@...>
Date: 2018-06-04T12:55:19Z
[FLINK-9280][rest] client modifications
commit 42db1bffeb7d8d49f0a0c0280f9c3110b8587dc1
Author: zentol <chesnay@...>
Date: 2018-06-04T13:43:11Z
[FLINK-9280][rest] server modifications
----
---