This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d0ce5349fdf [FLINK-34580][rest] Do not erase "pipeline.classpaths"
config during REST job deploy
d0ce5349fdf is described below
commit d0ce5349fdf1a611518eba20a169c475ee0b46c5
Author: Ferenc Csaky <[email protected]>
AuthorDate: Tue Mar 5 13:59:57 2024 +0100
[FLINK-34580][rest] Do not erase "pipeline.classpaths" config during REST
job deploy
---
.../flink/client/program/PackagedProgram.java | 6 +
.../webmonitor/handlers/utils/JarHandlerUtils.java | 33 +++++-
.../handlers/utils/JarHandlerUtilsTest.java | 131 +++++++++++++++------
3 files changed, 128 insertions(+), 42 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index f7f5df28d1b..f2bdb9a2e3e 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.program;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.Configuration;
@@ -683,6 +684,11 @@ public class PackagedProgram implements AutoCloseable {
return this;
}
+ @VisibleForTesting
+ public List<URL> getUserClassPaths() {
+ return userClassPaths;
+ }
+
public PackagedProgram build() throws ProgramInvocationException {
if (jarFile == null && entryPointClassName == null) {
throw new IllegalArgumentException(
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index c5e0f492c96..9b3ff11752a 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -46,6 +46,7 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -182,17 +183,22 @@ public class JarHandlerUtils {
}
try {
- return PackagedProgram.newBuilder()
- .setJarFile(jarFile.toFile())
- .setEntryPointClassName(entryClass)
- .setConfiguration(configuration)
- .setArguments(programArgs.toArray(new String[0]))
- .build();
+ return initPackagedProgramBuilder(configuration).build();
} catch (final ProgramInvocationException e) {
throw new CompletionException(e);
}
}
+ @VisibleForTesting
+ PackagedProgram.Builder initPackagedProgramBuilder(Configuration
configuration) {
+ return PackagedProgram.newBuilder()
+ .setJarFile(jarFile.toFile())
+ .setEntryPointClassName(entryClass)
+ .setConfiguration(configuration)
+ .setUserClassPaths(getClasspaths(configuration))
+ .setArguments(programArgs.toArray(new String[0]));
+ }
+
@VisibleForTesting
String getEntryClass() {
return entryClass;
@@ -214,6 +220,21 @@ public class JarHandlerUtils {
}
}
+ private static List<URL> getClasspaths(Configuration configuration) {
+ try {
+ return ConfigUtils.decodeListFromConfig(
+ configuration, PipelineOptions.CLASSPATHS, URL::new);
+ } catch (MalformedURLException e) {
+ throw new CompletionException(
+ new RestHandlerException(
+ String.format(
+ "Failed to extract '%s' as URLs. Provided
value: %s",
+ PipelineOptions.CLASSPATHS.key(),
+
configuration.get(PipelineOptions.CLASSPATHS)),
+ HttpResponseStatus.BAD_REQUEST));
+ }
+ }
+
/** Parse program arguments in jar run or plan request. */
private static <R extends JarRequestBody, M extends MessageParameters>
List<String> getProgramArgs(HandlerRequest<R> request, Logger log)
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
index 53643cabe95..d90ae902b05 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java
@@ -19,7 +19,10 @@
package org.apache.flink.runtime.webmonitor.handlers.utils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody;
@@ -31,18 +34,27 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.net.URL;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link JarHandlerUtils}. */
class JarHandlerUtilsTest {
private static final Logger LOG =
LoggerFactory.getLogger(JarHandlerUtilsTest.class);
+ @TempDir private Path tempDir;
+
@Test
void testTokenizeNonQuoted() {
final List<String> arguments =
JarHandlerUtils.tokenizeArguments("--foo bar");
@@ -65,17 +77,14 @@ class JarHandlerUtilsTest {
}
@Test
- void testFromRequestDefaults(@TempDir Path tmp) throws
RestHandlerException {
- final JarRunMessageParameters parameters =
- JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
- parameters.jarIdPathParameter.resolve("someJar");
+ void testFromRequestDefaults() throws Exception {
+ final JarRunMessageParameters parameters = getDummyMessageParameters();
final HandlerRequest<JarPlanRequestBody> request =
HandlerRequest.create(new JarPlanRequestBody(), parameters);
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
- JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp,
LOG);
+ JarHandlerUtils.JarHandlerContext.fromRequest(request,
tempDir, LOG);
assertThat(jarHandlerContext.getEntryClass()).isNull();
assertThat(jarHandlerContext.getProgramArgs()).isEmpty();
assertThat(jarHandlerContext.getParallelism())
@@ -84,25 +93,12 @@ class JarHandlerUtilsTest {
}
@Test
- void testFromRequestRequestBody(@TempDir Path tmp) throws
RestHandlerException {
- final JarRunMessageParameters parameters =
- JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
- parameters.jarIdPathParameter.resolve("someJar");
-
- final JarPlanRequestBody requestBody =
- new JarPlanRequestBody(
- "entry-class",
- null,
- Arrays.asList("arg1", "arg2"),
- 37,
- JobID.generate(),
- null);
- final HandlerRequest<JarPlanRequestBody> request =
- HandlerRequest.create(requestBody, parameters);
+ void testFromRequestRequestBody() throws Exception {
+ final JarPlanRequestBody requestBody =
getDummyJarPlanRequestBody("entry-class", 37, null);
+ final HandlerRequest<JarPlanRequestBody> request =
getDummyRequest(requestBody);
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
- JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp,
LOG);
+ JarHandlerUtils.JarHandlerContext.fromRequest(request,
tempDir, LOG);
assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName());
assertThat(jarHandlerContext.getProgramArgs())
.containsExactlyElementsOf(requestBody.getProgramArgumentsList());
@@ -111,28 +107,91 @@ class JarHandlerUtilsTest {
}
@Test
- void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws
RestHandlerException {
+ void testFromRequestWithParallelismConfig() throws Exception {
final int parallelism = 37;
- final JarRunMessageParameters parameters =
- JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-
- parameters.jarIdPathParameter.resolve("someJar");
-
final JarPlanRequestBody requestBody =
- new JarPlanRequestBody(
+ getDummyJarPlanRequestBody(
"entry-class",
null,
- Arrays.asList("arg1", "arg2"),
- null,
- JobID.generate(),
Collections.singletonMap(
CoreOptions.DEFAULT_PARALLELISM.key(),
String.valueOf(parallelism)));
- final HandlerRequest<JarPlanRequestBody> request =
- HandlerRequest.create(requestBody, parameters);
+ final HandlerRequest<JarPlanRequestBody> request =
getDummyRequest(requestBody);
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
- JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp,
LOG);
+ JarHandlerUtils.JarHandlerContext.fromRequest(request,
tempDir, LOG);
assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism);
}
+
+ @Test
+ void testClasspathsConfigNotErased() throws Exception {
+ final JarPlanRequestBody requestBody =
+ getDummyJarPlanRequestBody(
+ null,
+ null,
+ Collections.singletonMap(
+ PipelineOptions.CLASSPATHS.key(),
+ "file:/tmp/some.jar;file:/tmp/another.jar"));
+
+ final HandlerRequest<JarPlanRequestBody> request =
getDummyRequest(requestBody);
+
+ final JarHandlerUtils.JarHandlerContext jarHandlerContext =
+ JarHandlerUtils.JarHandlerContext.fromRequest(request,
tempDir, LOG);
+
+ final Configuration originalConfig =
request.getRequestBody().getFlinkConfiguration();
+ final PackagedProgram.Builder builder =
+ jarHandlerContext.initPackagedProgramBuilder(originalConfig);
+ final List<String> retrievedClasspaths =
+ builder.getUserClassPaths().stream()
+ .map(URL::toString)
+ .collect(Collectors.toList());
+
+
assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS));
+ }
+
+ @Test
+ void testMalformedClasspathsConfig() throws Exception {
+ final JarPlanRequestBody requestBody =
+ getDummyJarPlanRequestBody(
+ null,
+ null,
+ Collections.singletonMap(
+ PipelineOptions.CLASSPATHS.key(),
"invalid|:/jar"));
+ final HandlerRequest<JarPlanRequestBody> request =
getDummyRequest(requestBody);
+
+ final JarHandlerUtils.JarHandlerContext jarHandlerContext =
+ JarHandlerUtils.JarHandlerContext.fromRequest(request,
tempDir, LOG);
+
+ final Configuration originalConfig =
request.getRequestBody().getFlinkConfiguration();
+
+ assertThatThrownBy(() ->
jarHandlerContext.initPackagedProgramBuilder(originalConfig))
+ .satisfies(anyCauseMatches(RestHandlerException.class,
"invalid|:/jar"));
+ }
+
+ private HandlerRequest<JarPlanRequestBody> getDummyRequest(
+ @Nullable JarPlanRequestBody requestBody) {
+ return HandlerRequest.create(
+ requestBody == null ? new JarPlanRequestBody() : requestBody,
+ getDummyMessageParameters());
+ }
+
+ private JarRunMessageParameters getDummyMessageParameters() {
+ final JarRunMessageParameters parameters =
+ JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+
+ parameters.jarIdPathParameter.resolve("someJar");
+
+ return parameters;
+ }
+
+ private JarPlanRequestBody getDummyJarPlanRequestBody(
+ String entryClass, Integer parallelism, Map<String, String>
flinkConfiguration) {
+ return new JarPlanRequestBody(
+ entryClass,
+ null,
+ Arrays.asList("arg1", "arg2"),
+ parallelism,
+ JobID.generate(),
+ flinkConfiguration);
+ }
}