schevalley2 commented on code in PR #24065: URL: https://github.com/apache/flink/pull/24065#discussion_r1453433788
########## flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.artifact; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.cli.ArtifactFetchOptions; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.function.FunctionUtils; + +import org.apache.commons.io.FilenameUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Class that manages the artifact loading process. */ +public class ArtifactFetchManager { + + private final ArtifactFetcher localFetcher; + private final ArtifactFetcher fsFetcher; + private final ArtifactFetcher httpFetcher; + + private final Configuration conf; + private final File baseDir; + + public ArtifactFetchManager(Configuration conf) { + this(conf, null); + } + + public ArtifactFetchManager(Configuration conf, @Nullable String baseDir) { + this( + new LocalArtifactFetcher(), + new FsArtifactFetcher(), + new HttpArtifactFetcher(), + conf, + baseDir); + } + + @VisibleForTesting + ArtifactFetchManager( + ArtifactFetcher localFetcher, + ArtifactFetcher fsFetcher, + ArtifactFetcher httpFetcher, + Configuration conf, + @Nullable String baseDir) { + this.localFetcher = checkNotNull(localFetcher); + this.fsFetcher = checkNotNull(fsFetcher); + this.httpFetcher = checkNotNull(httpFetcher); + this.conf = checkNotNull(conf); + this.baseDir = + baseDir == null + ? new File(conf.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR)) + : new File(baseDir); + } + + /** + * Fetches artifacts from a given URI string array. The job jar and any additional artifacts are + * mixed, in case of multiple artifacts the {@link DefaultPackagedProgramRetriever} logic will + * be used to find the job jar. + * + * @param uris URIs to fetch + * @return result with the fetched artifacts + */ + public Result fetchArtifacts(String[] uris) { + checkArgument(uris != null && uris.length > 0, "At least one URI is required."); + + ArtifactUtils.createMissingParents(baseDir); + List<File> artifacts = + Arrays.stream(uris) + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + if (artifacts.size() > 1) { + return new Result(null, artifacts); + } + + if (artifacts.size() == 1) { + return new Result(artifacts.get(0), null); + } + + // Should not happen. + throw new IllegalStateException("Corrupt artifact fetching state."); + } + + /** + * Fetches the job jar and any additional artifact if the given list is not null or empty. + * + * @param jobUri URI of the job jar + * @param additionalUris URI(s) of any additional artifact to fetch + * @return result with the fetched artifacts + * @throws Exception + */ + public Result fetchArtifacts(String jobUri, @Nullable List<String> additionalUris) + throws Exception { + checkArgument(jobUri != null && !jobUri.trim().isEmpty(), "The jobUri is required."); + + ArtifactUtils.createMissingParents(baseDir); + File jobJar = fetchArtifact(jobUri); + + List<File> additionalArtifacts = + additionalUris == null + ? Collections.emptyList() + : additionalUris.stream() + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + return new Result(jobJar, additionalArtifacts); + } + + private File fetchArtifact(String uri) throws Exception { + URI resolvedUri = PackagedProgramUtils.resolveURI(uri); + File targetFile = new File(baseDir, FilenameUtils.getName(resolvedUri.getPath())); + if (targetFile.exists()) { + // Already fetched user artifacts are kept. + return targetFile; + } + + return getFetcher(resolvedUri).fetch(uri, conf, baseDir); + } + + @VisibleForTesting + ArtifactFetcher getFetcher(URI uri) { + if ("local".equals(uri.getScheme())) { + return localFetcher; + } + + if ("http".equals(uri.getScheme()) || "https".equals(uri.getScheme())) { Review Comment: Should there be more control over allowing or not to download from http? I can see how it can be useful for local testing but in more constrained environments, maybe it should be possible to disable it? On the other hand, not sure that having a flag `user.artifacts.enable-fetch-over-raw-http` is going to bring much benefits. ########## flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Artifact Fetch options. */ +public class ArtifactFetchOptions { + + public static final ConfigOption<String> ARTIFACT_BASE_DIR = + ConfigOptions.key("user.artifacts.base-dir") + .stringType() + .defaultValue("/opt/flink/artifacts") + .withDescription("The base dir to put the application job artifacts."); + + public static final ConfigOption<List<String>> ARTIFACT_LIST = + key("user.artifacts.artifact-list") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "A semicolon-separated list of the additional artifacts to fetch for the job before setting up the application cluster." + + "All of these have to be valid URIs. Example: s3://sandbox-bucket/format.jar,http://sandbox-server:1234/udf.jar"); + + public static final ConfigOption<Map<String, String>> ARTIFACT_HTTP_HEADERS = Review Comment: Is it intended to contain sensitive information? Could it be useful to be actually hide it from output? ########## flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.artifact; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.cli.ArtifactFetchOptions; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.function.FunctionUtils; + +import org.apache.commons.io.FilenameUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Class that manages the artifact loading process. */ +public class ArtifactFetchManager { + + private final ArtifactFetcher localFetcher; + private final ArtifactFetcher fsFetcher; + private final ArtifactFetcher httpFetcher; + + private final Configuration conf; + private final File baseDir; + + public ArtifactFetchManager(Configuration conf) { + this(conf, null); + } + + public ArtifactFetchManager(Configuration conf, @Nullable String baseDir) { + this( + new LocalArtifactFetcher(), + new FsArtifactFetcher(), + new HttpArtifactFetcher(), + conf, + baseDir); + } + + @VisibleForTesting + ArtifactFetchManager( + ArtifactFetcher localFetcher, + ArtifactFetcher fsFetcher, + ArtifactFetcher httpFetcher, + Configuration conf, + @Nullable String baseDir) { + this.localFetcher = checkNotNull(localFetcher); + this.fsFetcher = checkNotNull(fsFetcher); + this.httpFetcher = checkNotNull(httpFetcher); + this.conf = checkNotNull(conf); + this.baseDir = + baseDir == null + ? new File(conf.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR)) + : new File(baseDir); + } + + /** + * Fetches artifacts from a given URI string array. The job jar and any additional artifacts are + * mixed, in case of multiple artifacts the {@link DefaultPackagedProgramRetriever} logic will + * be used to find the job jar. + * + * @param uris URIs to fetch + * @return result with the fetched artifacts + */ + public Result fetchArtifacts(String[] uris) { + checkArgument(uris != null && uris.length > 0, "At least one URI is required."); + + ArtifactUtils.createMissingParents(baseDir); + List<File> artifacts = + Arrays.stream(uris) + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + if (artifacts.size() > 1) { + return new Result(null, artifacts); + } + + if (artifacts.size() == 1) { + return new Result(artifacts.get(0), null); + } + + // Should not happen. + throw new IllegalStateException("Corrupt artifact fetching state."); + } + + /** + * Fetches the job jar and any additional artifact if the given list is not null or empty. + * + * @param jobUri URI of the job jar + * @param additionalUris URI(s) of any additional artifact to fetch + * @return result with the fetched artifacts + * @throws Exception + */ + public Result fetchArtifacts(String jobUri, @Nullable List<String> additionalUris) + throws Exception { + checkArgument(jobUri != null && !jobUri.trim().isEmpty(), "The jobUri is required."); + + ArtifactUtils.createMissingParents(baseDir); + File jobJar = fetchArtifact(jobUri); + + List<File> additionalArtifacts = + additionalUris == null + ? Collections.emptyList() + : additionalUris.stream() + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + return new Result(jobJar, additionalArtifacts); + } + + private File fetchArtifact(String uri) throws Exception { + URI resolvedUri = PackagedProgramUtils.resolveURI(uri); + File targetFile = new File(baseDir, FilenameUtils.getName(resolvedUri.getPath())); + if (targetFile.exists()) { + // Already fetched user artifacts are kept. + return targetFile; + } + + return getFetcher(resolvedUri).fetch(uri, conf, baseDir); + } + + @VisibleForTesting + ArtifactFetcher getFetcher(URI uri) { + if ("local".equals(uri.getScheme())) { + return localFetcher; + } + + if ("http".equals(uri.getScheme()) || "https".equals(uri.getScheme())) { + return httpFetcher; + } + + return fsFetcher; + } + + /** Artifact fetch result with all fetched artifact(s). */ + public static class Result { + + private final File jobJar; + private final List<File> additionalJars; + + private Result(@Nullable File jobJar, @Nullable List<File> additionalJars) { + this.jobJar = jobJar; + this.additionalJars = additionalJars == null ? Collections.emptyList() : additionalJars; + } + + @Nullable + public File getJobJar() { + return jobJar; + } + + public List<File> getAdditionalJars() { + return Collections.unmodifiableList(additionalJars); + } + + @Nullable + public File getUserArtifactDir() { + return additionalJars.isEmpty() ? null : additionalJars.get(0).getParentFile(); Review Comment: if I understand correctly, at the end it should be either `baseDir` from the `ArtifactFetchManager` instance or the value of `ARTIFACT_BASE_DIR`, is that correct? ########## docs/content.zh/docs/deployment/resource-providers/standalone/docker.md: ########## @@ -175,17 +174,39 @@ To make the **job artifacts available** locally in the container, you can $ docker run flink_with_job_artifacts taskmanager ``` +* **or pass jar path by `jars` argument** when you start the JobManager: + + ```sh + $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" + $ docker network create flink-network + + $ docker run \ + --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ + --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \ Review Comment: Should it be using `flink-s3-fs-hadoop-{{< version >}}.jar` ? ########## flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java: ########## @@ -63,10 +63,22 @@ public class StandaloneApplicationClusterConfigurationParserFactory .desc("Job ID of the job to run.") .build(); + private static final Option JARS_OPTION = + Option.builder("jars") + .longOpt("jars") + .required(false) + .hasArg(true) + .hasArgs() Review Comment: it seems that `.hasArg(true)` is not required if we call `.hasArgs()` ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java: ########## @@ -111,14 +126,45 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( // No need to do pipelineJars validation if it is a PyFlink job. if (!(PackagedProgramUtils.isPython(jobClassName) || PackagedProgramUtils.isPython(programArguments))) { - final List<File> pipelineJars = - KubernetesUtils.checkJarFileForApplicationMode(configuration); - Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + final ArtifactFetchManager.Result fetchRes = fetchArtifacts(configuration); + return DefaultPackagedProgramRetriever.create( - userLibDir, pipelineJars.get(0), jobClassName, programArguments, configuration); + userLibDir, + fetchRes.getUserArtifactDir(), + fetchRes.getJobJar(), + jobClassName, + programArguments, + configuration); } return DefaultPackagedProgramRetriever.create( userLibDir, jobClassName, programArguments, configuration); } + + private static ArtifactFetchManager.Result fetchArtifacts(Configuration configuration) { + try { + String targetDir = generateJarDir(configuration); + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration, targetDir); + + List<String> uris = configuration.get(PipelineOptions.JARS); + checkArgument(uris.size() == 1, "Should only have one jar"); + List<String> additionalUris = + configuration + .getOptional(ArtifactFetchOptions.ARTIFACT_LIST) + .orElse(Collections.emptyList()); + + return fetchMgr.fetchArtifacts(uris.get(0), additionalUris); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static String generateJarDir(Configuration configuration) { + return String.join( + File.separator, + new File(configuration.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR)) + .getAbsolutePath(), + configuration.get(KubernetesConfigOptions.NAMESPACE), + configuration.get(KubernetesConfigOptions.CLUSTER_ID)); + } Review Comment: I am not sure I follow, could you explain me why do we put it in a subdirectory with the namespace and cluster name? (it looks ok but just to understand) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
