[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012596#comment-15012596
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user hustfxj commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45290503
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
    @@ -732,25 +854,90 @@
      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
       (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
     
    +(defn setup-blob-permission [conf storm-conf path]
    +  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
    +    (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["blob" path] :log-prefix (str "setup blob permissions for " path))))
    +
    +(defn setup-storm-code-dir [conf storm-conf dir]
    +  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
    +    (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) 
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
    +
    +(defn download-blobs-for-topology!
    +  "Download all blobs listed in the topology configuration for a given 
topology."
    +  [conf stormconf-path localizer tmproot]
    +  (let [storm-conf (read-supervisor-storm-conf-given-path conf 
stormconf-path)
    +        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
    +        user (storm-conf TOPOLOGY-SUBMITTER-USER)
    +        topo-name (storm-conf TOPOLOGY-NAME)
    +        user-dir (.getLocalUserFileCacheDir localizer user)
    +        localresources (blobstore-map-to-localresources blobstore-map)]
    +    (when localresources
    +      (when-not (.exists user-dir)
    +        (FileUtils/forceMkdir user-dir)
    +        (setup-blob-permission conf storm-conf (.toString user-dir)))
    +      (try
    +        (let [localized-resources (.getBlobs localizer localresources user 
topo-name user-dir)]
    +          (setup-blob-permission conf storm-conf (.toString user-dir))
    +          (doseq [local-rsrc localized-resources]
    +            (let [rsrc-file-path (File. (.getFilePath local-rsrc))
    +                  key-name (.getName rsrc-file-path)
    +                  blob-symlink-target-name (.getName (File. 
(.getCurrentSymlinkPath local-rsrc)))
    +                  symlink-name (get-blob-localname (get blobstore-map 
key-name) key-name)]
    +              (create-symlink! tmproot (.getParent rsrc-file-path) 
symlink-name
    +                blob-symlink-target-name))))
    +        (catch AuthorizationException authExp
    +          (log-error authExp))
    +        (catch KeyNotFoundException knf
    +          (log-error knf))))))
    +
    +(defn get-blob-file-names
    +  [blobstore-map]
    +  (if blobstore-map
    +    (for [[k, data] blobstore-map]
    +      (get-blob-localname data k))))
    +
    +(defn download-blobs-for-topology-succeed?
    +  "Assert if all blobs are downloaded for the given topology"
    +  [stormconf-path target-dir]
    +  (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf 
(FileUtils/readFileToByteArray (File. stormconf-path))))
    +        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
    +        file-names (get-blob-file-names blobstore-map)]
    +    (if (and file-names (> (count file-names) 0))
    +      (every? #(Utils/checkFileExists target-dir %) file-names)
    +      true)))
    +
     ;; distributed implementation
     (defmethod download-storm-code
    -    :distributed [conf storm-id master-code-dir supervisor download-lock]
    -    ;; Downloading to permanent location is atomic
    -    (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator 
(uuid))
    -          stormroot (supervisor-stormdist-root conf storm-id)
    -          master-meta-file-path (master-storm-metafile-path 
master-code-dir)
    -          supervisor-meta-file-path (supervisor-storm-metafile-path 
tmproot)]
    -      (locking download-lock
    -        (log-message "Downloading code for storm id " storm-id " from " 
master-code-dir)
    -        (FileUtils/forceMkdir (File. tmproot))
    -        (Utils/downloadFromMaster conf master-meta-file-path 
supervisor-meta-file-path)
    -        (if (:code-distributor supervisor)
    -          (.download (:code-distributor supervisor) storm-id (File. 
supervisor-meta-file-path)))
    -        (extract-dir-from-jar (supervisor-stormjar-path tmproot) 
RESOURCES-SUBDIR tmproot)
    -        (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. 
stormroot)))
    -        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
    -        (setup-storm-code-dir conf (read-supervisor-storm-conf conf 
storm-id) stormroot)
    -        (log-message "Finished downloading code for storm id " storm-id " 
from " master-code-dir))))
    +  :distributed [conf storm-id master-code-dir localizer]
    +  ;; Downloading to permanent location is atomic
    +  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
    +        stormroot (supervisor-stormdist-root conf storm-id)
    +        blobstore (Utils/getSupervisorBlobStore conf)]
    --- End diff --
    
    I hava a small suggestions.  for example, this blobstore which is 
supervisor's client about blobstore maybe make me confused with real 
"Blobstore". can you change other name?


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to