[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012596#comment-15012596 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45290503 --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj --- @@ -732,25 +854,90 @@ (if (conf SUPERVISOR-RUN-WORKER-AS-USER) (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir)))) +(defn setup-blob-permission [conf storm-conf path] + (if (conf SUPERVISOR-RUN-WORKER-AS-USER) + (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path)))) + +(defn setup-storm-code-dir [conf storm-conf dir] + (if (conf SUPERVISOR-RUN-WORKER-AS-USER) + (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir)))) + +(defn download-blobs-for-topology! + "Download all blobs listed in the topology configuration for a given topology." + [conf stormconf-path localizer tmproot] + (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + user (storm-conf TOPOLOGY-SUBMITTER-USER) + topo-name (storm-conf TOPOLOGY-NAME) + user-dir (.getLocalUserFileCacheDir localizer user) + localresources (blobstore-map-to-localresources blobstore-map)] + (when localresources + (when-not (.exists user-dir) + (FileUtils/forceMkdir user-dir) + (setup-blob-permission conf storm-conf (.toString user-dir))) + (try + (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)] + (setup-blob-permission conf storm-conf (.toString user-dir)) + (doseq [local-rsrc localized-resources] + (let [rsrc-file-path (File. (.getFilePath local-rsrc)) + key-name (.getName rsrc-file-path) + blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc))) + symlink-name (get-blob-localname (get blobstore-map key-name) key-name)] + (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name + blob-symlink-target-name)))) + (catch AuthorizationException authExp + (log-error authExp)) + (catch KeyNotFoundException knf + (log-error knf)))))) + +(defn get-blob-file-names + [blobstore-map] + (if blobstore-map + (for [[k, data] blobstore-map] + (get-blob-localname data k)))) + +(defn download-blobs-for-topology-succeed? + "Assert if all blobs are downloaded for the given topology" + [stormconf-path target-dir] + (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))) + blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP) + file-names (get-blob-file-names blobstore-map)] + (if (and file-names (> (count file-names) 0)) + (every? #(Utils/checkFileExists target-dir %) file-names) + true))) + ;; distributed implementation (defmethod download-storm-code - :distributed [conf storm-id master-code-dir supervisor download-lock] - ;; Downloading to permanent location is atomic - (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) - stormroot (supervisor-stormdist-root conf storm-id) - master-meta-file-path (master-storm-metafile-path master-code-dir) - supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)] - (locking download-lock - (log-message "Downloading code for storm id " storm-id " from " master-code-dir) - (FileUtils/forceMkdir (File. tmproot)) - (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path) - (if (:code-distributor supervisor) - (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path))) - (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot) - (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot))) - (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) - (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot) - (log-message "Finished downloading code for storm id " storm-id " from " master-code-dir)))) + :distributed [conf storm-id master-code-dir localizer] + ;; Downloading to permanent location is atomic + (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid)) + stormroot (supervisor-stormdist-root conf storm-id) + blobstore (Utils/getSupervisorBlobStore conf)] --- End diff -- I hava a small suggestions. for example, this blobstore which is supervisor's client about blobstore maybe make me confused with real "Blobstore". can you change other name? > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)