[ https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=933302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933302 ]
ASF GitHub Bot logged work on GOBBLIN-2135: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Sep/24 06:24 Start Date: 05/Sep/24 06:24 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4030: URL: https://github.com/apache/gobblin/pull/4030#discussion_r1735415804 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java: ########## @@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th for (String jarFile : SPLITTER.split(jarFileList)) { Path srcJarFile = new Path(jarFile); FileStatus[] fileStatusList = lfs.globStatus(srcJarFile); - for (FileStatus status : fileStatusList) { + Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status, this.unsharedJarsDir, jarFileDir); // For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence // or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory. // the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging. - int retryCount = 0; - boolean shouldFileBeAddedIntoDC = true; - Path destJarFile = calculateDestJarFile(status, jarFileDir); - // Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path. - while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - try { - if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); - throw new IOException("Waiting for file to complete on uploading ... "); - } - // Set the first parameter as false for not deleting sourceFile - // Set the second parameter as false for not overwriting existing file on the target, by default it is true. - // If the file is preExisted but overwrite flag set to false, then an IOException if thrown. - this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile); - } catch (IOException | InterruptedException e) { - LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); - retryCount += 1; - if (retryCount >= this.jarFileMaximumRetry) { - LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); - // If retry reaches upper limit, skip copying this file. - shouldFileBeAddedIntoDC = false; - break; - } - } - } - if (shouldFileBeAddedIntoDC) { + if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) { // Then add the jar file on HDFS to the classpath LOG.info(String.format("Adding %s to classpath", destJarFile)); DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); + } else { + LOG.error("Failed to upload jar file: " + status.getPath()); Review Comment: I don't find the prior code throwing an error... nonetheless, should everything continue on w/ just some error logs? shouldn't we instead fail the overall job because presumably necessary jars won't be there? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -484,12 +487,29 @@ private void requestContainer(Optional<String> preferredNode, Resource resource) protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); + Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); Review Comment: though you said in the PR desc, suggest a comment here about "unshared" dir being for "-SNAPSHOT" versions ########## gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; + + /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + * Snapshot dirs should not be shared, as different jobs may be using different versions of it. + * @param fs + * @param localJar + * @param unsharedJarsDir + * @param jarCacheDir + * @return + * @throws IOException + */ + public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException { Review Comment: suggest to name `calculateDestJarFilePath`. and, since the only use is `localJar.getPath().getName()` suggest to make the param `String jarName` ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -300,10 +304,10 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.detachOnExitEnabled = ConfigUtils - .getBoolean(config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED, + .getBoolean(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT); - this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); - + this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); + this.jarCacheEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); Review Comment: NBD, but you just updated the two above to be `this.config`, but only use `config` here :) ########## gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; Review Comment: are these seconds? suggest a suffix to clarify ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java: ########## @@ -199,6 +203,18 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati } } + public static Path getJarPathCacheAndCleanIfNeeded(Config config, FileSystem fs) throws IOException { Review Comment: this feels like two separate operations: ``` Path calcJarCacheCurrentPath(Config, FileSystem); boolean retainKLatestCachePaths(Path parentCachePath, int k); /// true iff any deletion ``` ########## gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; + + /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + * Snapshot dirs should not be shared, as different jobs may be using different versions of it. + * @param fs + * @param localJar + * @param unsharedJarsDir + * @param jarCacheDir + * @return + * @throws IOException + */ + public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException { + Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; + Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName()); + return destJarFile; + } + /** + * Upload a jar file to HDFS with retries to handle already existing jars + * @param fs + * @param localJar + * @param destJarFile + * @param jarFileMaximumRetry + * @return + * @throws IOException + */ + public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException { Review Comment: `jarFileMaximumRetry` => simply `maxAttempts`? ########## gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; + + /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + * Snapshot dirs should not be shared, as different jobs may be using different versions of it. + * @param fs + * @param localJar + * @param unsharedJarsDir + * @param jarCacheDir + * @return + * @throws IOException + */ + public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException { + Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; + Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName()); + return destJarFile; + } + /** + * Upload a jar file to HDFS with retries to handle already existing jars + * @param fs + * @param localJar + * @param destJarFile + * @param jarFileMaximumRetry + * @return + * @throws IOException + */ + public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException { + int retryCount = 0; + while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + try { + if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); Review Comment: what are we waiting on here? is `copyFromLocalFile` asynchronous? or are we thinking a different process may be doing upload. with `overwriteAnyExistingDestFile == false`, we don't seem to undertake any "repair", do we? instead we just wait `(waitTime * jarFileMaximumRetry)` ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -640,31 +643,36 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : appWorkDir; Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); - LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", appMasterWorkDir.toString()); + Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); + LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", appMasterWorkDir); + LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}", appMasterJarsCacheDir); Map<String, LocalResource> appMasterResources = Maps.newHashMap(); FileSystem localFs = FileSystem.getLocal(new Configuration()); - // NOTE: log after each step below for insight into what takes bulk of time if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) { - Path libJarsDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); + // Lib jars are shared between all containers, store at the root level + Path libJarsDestDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); + Path unsharedJarsDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); addLibJars(new Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)), - Optional.of(appMasterResources), libJarsDestDir, localFs); - LOGGER.info("Added lib jars to directory: {}", libJarsDestDir.toString()); + Optional.of(appMasterResources), libJarsDestDir, unsharedJarsDestDir, localFs); + LOGGER.info("Added lib jars to directory: {}", libJarsDestDir); Review Comment: it seems `addLibJars` could also add some to `unsharedJarsDestDir`, so good to mention that while logging ########## gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; + + /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + * Snapshot dirs should not be shared, as different jobs may be using different versions of it. + * @param fs + * @param localJar + * @param unsharedJarsDir + * @param jarCacheDir + * @return + * @throws IOException + */ + public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException { + Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; + Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName()); + return destJarFile; + } + /** + * Upload a jar file to HDFS with retries to handle already existing jars + * @param fs + * @param localJar + * @param destJarFile + * @param jarFileMaximumRetry + * @return + * @throws IOException + */ + public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException { + int retryCount = 0; + while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + try { + if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); + throw new IOException("Waiting for file to complete on uploading ... "); + } + // Set the first parameter as false for not deleting sourceFile + // Set the second parameter as false for not overwriting existing file on the target, by default it is true. + // If the file is preExisted but overwrite flag set to false, then an IOException if thrown. + fs.copyFromLocalFile(false, false, localJar.getPath(), destJarFile); Review Comment: ``` boolean deleteSourceFile = false; boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile) ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -484,12 +487,29 @@ private void requestContainer(Optional<String> preferredNode, Resource resource) protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); + Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : appWorkDir; + Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir); + LOGGER.info("Container uncached jars root dir: " + containerJarsUnsharedDir); Review Comment: I'd find something like "execution-private" or "unshared" more explicit than uncached. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -484,12 +487,29 @@ private void requestContainer(Optional<String> preferredNode, Resource resource) protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); + Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : appWorkDir; + Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir); + LOGGER.info("Container uncached jars root dir: " + containerJarsUnsharedDir); Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); - Map<String, LocalResource> resourceMap = Maps.newHashMap(); + Map<String, LocalResource> resourceMap = Maps.newHashMap(); + // Always fetch any jars from the appWorkDir for any potential snapshot jars addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); - addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap); + if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) { + addContainerLocalResources(new Path(containerJarsUnsharedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), + resourceMap); + } + if (this.jarCacheEnabled) { + addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); + if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) { Review Comment: I don't quite understand this key. you're checking it here in two different conditionals, but in neither one do you actually use (or even check to see) what value it holds ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); Review Comment: again, should this be an actual failure, not merely logging? ...or do we believe there are times when it's actually OK to continue? Issue Time Tracking ------------------- Worklog Id: (was: 933302) Time Spent: 20m (was: 10m) > Cache Yarn jars in GobblinYarnAppLauncher > ----------------------------------------- > > Key: GOBBLIN-2135 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2135 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: William Lo > Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Gobblin YARN Application Launcher lacks some functionality used in > MRJobLauncher. One of the biggest gaps in feature parity is the absence of > jar caching, where MRJobLauncher creates a monthly cache that is > automatically cleaned up by subsequent executions performed 2 months in > advance. > YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 > mins for a sizeable job to get all the jars), and given that many jobs do > share the same jars, it makes sense to cache them together and only provide > YARN the shared path. > We also want to ensure that SNAPSHOT jars are other files are not uploaded to > a cache, since they are not immutable unlike jar versions on Artifactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)