gyfora commented on code in PR #202:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions {
.noDefaultValue()
.withDescription(
"Whether to enable recovery of missing/deleted
jobmanager deployments. False by default for Flink 1.14, true for newer Flink
version.");
+
+ public static final ConfigOption<Map<String, String>>
JAR_ARTIFACT_HTTP_HEADER =
+ ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom HTTP header for a Flink job. If configured
in cluster level, headers will be applied to all jobs within"
Review Comment:
I think the description should detail what the config is good for. Does it
affect artifact downloading?
It doesn't need to explain how configurations work, we have general docs for
that. So I would completely remove this part:
```
If configured in cluster level, headers will be applied to all jobs within
the cluster. This field can also be configured under
spec.job.flinkConfiguration for a specific session job within a session
cluster. If configured at session job level, it will override the cluster level
configuration.
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment
deployment) {
return getConfig(deployment.getMetadata(),
ReconciliationUtils.getDeployedSpec(deployment));
}
+ public Configuration getSessionJobObserveConfig(
Review Comment:
I think it's a bit confusing to call it `getSessionJobObserveConfig`. I
would prefer to call it simply `getSessionJobConfig`
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment
deployment) {
return getConfig(deployment.getMetadata(),
ReconciliationUtils.getDeployedSpec(deployment));
}
+ public Configuration getSessionJobObserveConfig(
+ FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+ Configuration sessionJobConfig = getObserveConfig(deployment);
+
+ // merge session job specific config
+ Map<String, String> sessionJobFlinkConfiguration =
+ flinkSessionJob.getSpec().getFlinkConfiguration();
+ if (sessionJobFlinkConfiguration != null &&
!sessionJobFlinkConfiguration.isEmpty()) {
Review Comment:
The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java:
##########
@@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration {
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
+ Map<String, String> artifactHttpHeader;
Review Comment:
We shouldn't add this extra field here, it's not an operator configuration
and it's also not used anywhere
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements
ArtifactFetcher {
new FileSystemBasedArtifactFetcher();
@Override
- public File fetch(String uri, File targetDir) throws Exception {
+ public File fetch(String uri, Configuration flinkConfiguration, File
targetDir)
Review Comment:
Why did you add the config field here? Is it used?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -404,4 +409,21 @@ private Optional<String> validateServiceAccount(String
serviceAccount) {
}
return Optional.empty();
}
+
+ private Optional<String> validateFlinkSessionJobConfig(
+ Map<String, String> flinkSessionJobConfig) {
+ if (flinkSessionJobConfig == null) {
+ return Optional.empty();
+ }
+
+ for (String key : flinkSessionJobConfig.keySet()) {
+ if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) {
Review Comment:
Could we please add a simple test for this into the validator test?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher
{
public static final HttpArtifactFetcher INSTANCE = new
HttpArtifactFetcher();
@Override
- public File fetch(String uri, File targetDir) throws Exception {
+ public File fetch(String uri, Configuration flinkConfiguration, File
targetDir)
+ throws Exception {
var start = System.currentTimeMillis();
URL url = new URL(uri);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+ // merged session job level header and cluster level header, session
job level header take
+ // precedence.
+ Map<String, String> headers =
+
flinkConfiguration.get(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER);
+
+ if (headers != null && headers.size() > 0) {
Review Comment:
size check not necessary
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment
deployment) {
return getConfig(deployment.getMetadata(),
ReconciliationUtils.getDeployedSpec(deployment));
}
+ public Configuration getSessionJobObserveConfig(
+ FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+ Configuration sessionJobConfig = getObserveConfig(deployment);
+
+ // merge session job specific config
+ Map<String, String> sessionJobFlinkConfiguration =
+ flinkSessionJob.getSpec().getFlinkConfiguration();
+ if (sessionJobFlinkConfiguration != null &&
!sessionJobFlinkConfiguration.isEmpty()) {
+ sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);
Review Comment:
I am wondering for this particular case, would it make sense to merge the
HTTP headers instead of overwriting them if the base config also defined it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]