Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4767#discussion_r142634775
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
---
@@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters()
{
}
}
+ private static class TestWebSocketOperation {
+
+ private static class WsParameters extends MessageParameters {
+ private final JobIDPathParameter jobIDPathParameter =
new JobIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>>
getPathParameters() {
+ return
Collections.singleton(jobIDPathParameter);
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>>
getQueryParameters() {
+ return Collections.emptyList();
+ }
+ }
+
+ static class WsHeaders implements
MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/test/:jobid/subscribe";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<WebSocketUpgradeResponseBody>
getResponseClass() {
+ return WebSocketUpgradeResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public WsParameters getUnresolvedMessageParameters() {
+ return new WsParameters();
+ }
+ }
+
+ static class WsRestHandler extends
AbstractRestHandler<RestfulGateway, EmptyRequestBody,
WebSocketUpgradeResponseBody, WsParameters> {
+
+ private final TestEventProvider eventProvider;
+
+ WsRestHandler(
+ CompletableFuture<String> localAddressFuture,
+ GatewayRetriever<RestfulGateway>
leaderRetriever,
+ TestEventProvider eventProvider,
+ Time timeout) {
+ super(localAddressFuture, leaderRetriever,
timeout, new WsHeaders());
+ this.eventProvider = eventProvider;
+ }
+
+ @Override
+ protected
CompletableFuture<WebSocketUpgradeResponseBody> handleRequest(@Nonnull
HandlerRequest<EmptyRequestBody, WsParameters> request, @Nonnull RestfulGateway
gateway) throws RestHandlerException {
+ JobID jobID =
request.getPathParameter(JobIDPathParameter.class);
+ Assert.assertEquals(PATH_JOB_ID, jobID);
+ ChannelHandler messageHandler = new
WsMessageHandler(eventProvider, jobID);
--- End diff --
if this is how a AbstractRestHandler implementation for WebSockets would
actually look like I'm questioning the benefit of implementing it as a
AbstractRestHandler in the first place.
An explicit AbstractWebSocketRestHandler class could have an abstract
`initializeWebSocket(HandlerRequest ...)` method instead of hacking it into
`AbstractRestHandler#handleRequest` and a separate method for creating the
event provider that is types to the actual response we're sending back.
---