This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d56454a  [FLINK-18087][yarn] Fix uploading user artifact for Yarn job 
cluster
d56454a is described below

commit d56454a3a0abe9ba34d1fec27864863bbd8a7c5f
Author: wangyang0918 <[email protected]>
AuthorDate: Wed Jun 3 19:28:58 2020 +0800

    [FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster
    
    This closes #12463.
---
 .../src/test/java/org/apache/flink/yarn/YARNITCase.java            | 7 +++++++
 .../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 1e5a64e..88d91ca 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
@@ -126,6 +127,12 @@ public class YARNITCase extends YarnTestBase {
                                                        false)
                                        .getClusterClient()) {
 
+                               for (DistributedCache.DistributedCacheEntry 
entry : jobGraph.getUserArtifacts().values()) {
+                                       assertTrue(
+                                               String.format("The user 
artifacts(%s) should be remote or uploaded to remote filesystem.", 
entry.filePath),
+                                               
Utils.isRemotePath(entry.filePath));
+                               }
+
                                ApplicationId applicationId = 
clusterClient.getClusterId();
 
                                final CompletableFuture<JobResult> 
jobResultCompletableFuture = 
clusterClient.requestJobResult(jobGraph.getJobID());
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2d2103c..8e15e56 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -744,7 +744,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                if (jobGraph != null) {
                        for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> entry : 
jobGraph.getUserArtifacts().entrySet()) {
                                // only upload local files
-                               if 
(Utils.isRemotePath(entry.getValue().filePath)) {
+                               if 
(!Utils.isRemotePath(entry.getValue().filePath)) {
                                        Path localPath = new 
Path(entry.getValue().filePath);
                                        Tuple2<Path, Long> remoteFileInfo =
                                                        
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());

Reply via email to