Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1626#discussion_r75471141
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormSubmitter.java ---
    @@ -258,6 +276,65 @@ public static void submitTopologyAs(String name, Map 
stormConf, StormTopology to
     
         }
     
    +    private static List<String> 
uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
    +        LOG.info("Uploading dependencies - jars...");
    +
    +        DependencyPropertiesParser propertiesParser = new 
DependencyPropertiesParser();
    +
    +        String depJarsProp = System.getProperty("storm.dependency.jars", 
"");
    +        List<File> depJars = 
propertiesParser.parseJarsProperties(depJarsProp);
    +
    +        try {
    +            return uploader.uploadFiles(depJars, true);
    +        } catch (Throwable e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private static List<String> 
uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
    +        LOG.info("Uploading dependencies - artifacts...");
    +
    +        DependencyPropertiesParser propertiesParser = new 
DependencyPropertiesParser();
    +
    +        String depArtifactsProp = 
System.getProperty("storm.dependency.artifacts", "{}");
    +        Map<String, File> depArtifacts = 
propertiesParser.parseArtifactsProperties(depArtifactsProp);
    +
    +        try {
    +            return uploader.uploadArtifacts(depArtifacts);
    +        } catch (Throwable e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private static void setDependencyBlobsToTopology(StormTopology 
topology, List<String> jarsBlobKeys, List<String> artifactsBlobKeys) {
    +        LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", 
jarsBlobKeys, artifactsBlobKeys);
    +        topology.set_dependency_jars(jarsBlobKeys);
    +        topology.set_dependency_artifacts(artifactsBlobKeys);
    +    }
    +
    +    private static void submitTopologyInDistributeMode(String name, 
StormTopology topology, SubmitOptions opts,
    +                                                       ProgressListener 
progressListener, String asUser, Map conf,
    +                                                       String serConf) 
throws TException {
    +        String jar = submitJarAs(conf, System.getProperty("storm.jar"), 
progressListener, asUser);
    +        try {
    +            LOG.info("Submitting topology " + name + " in distributed mode 
with conf " + serConf);
    +            NimbusClient client = NimbusClient.getConfiguredClientAs(conf, 
asUser);
    --- End diff --
    
    Nice catch. Will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to