[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012596#comment-15012596
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user hustfxj commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45290503
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -732,25 +854,90 @@
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER)
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+(defn setup-blob-permission [conf storm-conf path]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER)
["blob" path] :log-prefix (str "setup blob permissions for " path))))
+
+(defn setup-storm-code-dir [conf storm-conf dir]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER)
["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+
+(defn download-blobs-for-topology!
+ "Download all blobs listed in the topology configuration for a given
topology."
+ [conf stormconf-path localizer tmproot]
+ (let [storm-conf (read-supervisor-storm-conf-given-path conf
stormconf-path)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ user-dir (.getLocalUserFileCacheDir localizer user)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (when localresources
+ (when-not (.exists user-dir)
+ (FileUtils/forceMkdir user-dir)
+ (setup-blob-permission conf storm-conf (.toString user-dir)))
+ (try
+ (let [localized-resources (.getBlobs localizer localresources user
topo-name user-dir)]
+ (setup-blob-permission conf storm-conf (.toString user-dir))
+ (doseq [local-rsrc localized-resources]
+ (let [rsrc-file-path (File. (.getFilePath local-rsrc))
+ key-name (.getName rsrc-file-path)
+ blob-symlink-target-name (.getName (File.
(.getCurrentSymlinkPath local-rsrc)))
+ symlink-name (get-blob-localname (get blobstore-map
key-name) key-name)]
+ (create-symlink! tmproot (.getParent rsrc-file-path)
symlink-name
+ blob-symlink-target-name))))
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf))))))
+
+(defn get-blob-file-names
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, data] blobstore-map]
+ (get-blob-localname data k))))
+
+(defn download-blobs-for-topology-succeed?
+ "Assert if all blobs are downloaded for the given topology"
+ [stormconf-path target-dir]
+ (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf
(FileUtils/readFileToByteArray (File. stormconf-path))))
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ file-names (get-blob-file-names blobstore-map)]
+ (if (and file-names (> (count file-names) 0))
+ (every? #(Utils/checkFileExists target-dir %) file-names)
+ true)))
+
;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir supervisor download-lock]
- ;; Downloading to permanent location is atomic
- (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator
(uuid))
- stormroot (supervisor-stormdist-root conf storm-id)
- master-meta-file-path (master-storm-metafile-path
master-code-dir)
- supervisor-meta-file-path (supervisor-storm-metafile-path
tmproot)]
- (locking download-lock
- (log-message "Downloading code for storm id " storm-id " from "
master-code-dir)
- (FileUtils/forceMkdir (File. tmproot))
- (Utils/downloadFromMaster conf master-meta-file-path
supervisor-meta-file-path)
- (if (:code-distributor supervisor)
- (.download (:code-distributor supervisor) storm-id (File.
supervisor-meta-file-path)))
- (extract-dir-from-jar (supervisor-stormjar-path tmproot)
RESOURCES-SUBDIR tmproot)
- (if (.exists (File. stormroot)) (FileUtils/forceDelete (File.
stormroot)))
- (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf
storm-id) stormroot)
- (log-message "Finished downloading code for storm id " storm-id "
from " master-code-dir))))
+ :distributed [conf storm-id master-code-dir localizer]
+ ;; Downloading to permanent location is atomic
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ blobstore (Utils/getSupervisorBlobStore conf)]
--- End diff --
I hava a small suggestions. for example, this blobstore which is
supervisor's client about blobstore maybe make me confused with real
"Blobstore". can you change other name?
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)