This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c5516533a57 [FLINK-32884] [flink-clients] Sending messageHeaders with
decorated customHeaders for PyFlink client (#23452)
c5516533a57 is described below
commit c5516533a5705297fe3cfd33860e59da30750f69
Author: Elkhan Dadash <[email protected]>
AuthorDate: Tue Sep 26 22:30:12 2023 -0700
[FLINK-32884] [flink-clients] Sending messageHeaders with decorated
customHeaders for PyFlink client (#23452)
---
.../org/apache/flink/client/program/rest/RestClusterClient.java | 2 +-
.../org/apache/flink/client/program/rest/UrlPrefixDecorator.java | 9 +++++++++
.../apache/flink/client/program/rest/RestClusterClientTest.java | 6 ++++++
.../flink/runtime/rest/messages/CustomHeadersDecorator.java | 9 +++++++++
4 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index b05dc331853..cc5a6d2d833 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -1021,7 +1021,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
restClient.sendRequest(
webMonitorBaseUrl.getHost(),
webMonitorBaseUrl.getPort(),
- messageHeaders,
+ headers,
messageParameters,
request,
filesToUpload);
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
index bd6398176c5..52f1b8cf56e 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
@@ -107,4 +107,13 @@ public class UrlPrefixDecorator<
public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
return decorated.getSupportedAPIVersions();
}
+
+ @Override
+ public Collection<Class<?>> getResponseTypeParameters() {
+ return decorated.getResponseTypeParameters();
+ }
+
+ public MessageHeaders<R, P, M> getDecorated() {
+ return decorated;
+ }
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 4f7e2f0b08d..e1db5c6d1b0 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -61,6 +61,7 @@ import
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -909,6 +910,11 @@ class RestClusterClientTest {
final AtomicBoolean firstSubmitRequestFailed = new
AtomicBoolean(false);
failHttpRequest =
(messageHeaders, messageParameters, requestBody) -> {
+ messageHeaders =
+ ((UrlPrefixDecorator)
+ ((CustomHeadersDecorator)
messageHeaders)
+ .getDecorated())
+ .getDecorated();
if (messageHeaders instanceof JobExecutionResultHeaders) {
return !firstExecutionResultPollFailed.getAndSet(true);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
index 979c849166c..05fa8289dec 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
@@ -96,6 +96,11 @@ public class CustomHeadersDecorator<
return customHeaders;
}
+ @Override
+ public Collection<Class<?>> getResponseTypeParameters() {
+ return decorated.getResponseTypeParameters();
+ }
+
/**
* Sets the custom headers for the message.
*
@@ -117,4 +122,8 @@ public class CustomHeadersDecorator<
}
customHeaders.add(httpHeader);
}
+
+ public MessageHeaders<R, P, M> getDecorated() {
+ return decorated;
+ }
}