[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987779#comment-14987779
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r43785630
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -346,53 +394,124 @@
[(.getNodeId slot) (.getPort slot)]
)))
+(defn- get-nimbus-subject []
+ (let [nimbus-subject (Subject.)
+ nimbus-principal (NimbusPrincipal.)
+ principals (.getPrincipals nimbus-subject)]
+ (.add principals nimbus-principal)
+ nimbus-subject))
+
+(defn- get-metadata-version [blob-store key subject]
+ (let [blob-meta (.getBlobMeta blob-store key subject)]
+ (.get_version blob-meta)))
+
+(defn get-key-list-from-blob-store [blob-store]
+ (let [key-iter (.listKeys blob-store (get-nimbus-subject))
+ keys (iterator-seq key-iter)]
+ (if (not-nil? keys)
+ (java.util.ArrayList. keys)
+ [])))
+
(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf
topology]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (log-message "nimbus file location:" stormroot)
- (FileUtils/forceMkdir (File. stormroot))
- (FileUtils/cleanDirectory (File. stormroot))
- (setup-jar conf tmp-jar-location stormroot)
- (FileUtils/writeByteArrayToFile (File. (master-stormcode-path
stormroot)) (Utils/serialize topology))
- (FileUtils/writeByteArrayToFile (File. (master-stormconf-path
stormroot)) (Utils/toCompressedJsonConf storm-conf))
- (if (:code-distributor nimbus) (.upload (:code-distributor nimbus)
stormroot storm-id))
- ))
+ (let [subject (get-subject)
+ storm-cluster-state (:storm-cluster-state nimbus)
+ blob-store (:blob-store nimbus)
+ jar-key (master-stormjar-key storm-id)
+ code-key (master-stormcode-key storm-id)
+ conf-key (master-stormconf-key storm-id)
+ nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+ (log-message "subject-changed" subject)
+ (if tmp-jar-location ;;in local mode there is no jar
+ (do
+ (log-message "tmp-jar-location" tmp-jar-location)
+ (.createBlob blob-store jar-key (FileInputStream.
tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state jar-key
nimbus-host-port-info (get-metadata-version blob-store jar-key subject)))
+ ))
+ (.createBlob blob-store conf-key (Utils/toCompressedJsonConf
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state conf-key
nimbus-host-port-info (get-metadata-version blob-store conf-key subject)))
+ (.createBlob blob-store code-key (Utils/serialize topology)
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+ (if (instance? LocalFsBlobStore blob-store)
+ (.setup-blobstore! storm-cluster-state code-key
nimbus-host-port-info (get-metadata-version blob-store code-key subject)))))
+
+(defn- read-storm-topology [storm-id blob-store]
+ (Utils/deserialize
+ (.readBlob blob-store (master-stormcode-key storm-id) (get-subject))
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+ (if (:blob-store nimbus)
+ (-> (:blob-store nimbus)
+ (.getBlobReplication blob-key (get-nimbus-subject))
+ (.get_replication))))
(defn- wait-for-desired-code-replication [nimbus conf storm-id]
(let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
max-replication-wait-time (conf
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
- total-wait-time (atom 0)
- current-replication-count (atom (if (:code-distributor nimbus)
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
- (if (:code-distributor nimbus)
- (while (and (> min-replication-count @current-replication-count)
+ current-replication-count-jar (if (not (local-mode? conf)) (atom
+
(get-blob-replication-count (master-stormjar-key storm-id) nimbus))
+ (atom
min-replication-count))
+ current-replication-count-code (atom (get-blob-replication-count
(master-stormcode-key storm-id) nimbus))
+ current-replication-count-conf (atom (get-blob-replication-count
(master-stormconf-key storm-id) nimbus))
+ total-wait-time (atom 0)]
+ (log-message "wait for desired replication" "count"
+ min-replication-count "wait-time" max-replication-wait-time
+ "code" @current-replication-count-code
+ "conf" @current-replication-count-conf
+ "jar" @current-replication-count-jar
+ "replication count" (get-blob-replication-count
(master-stormconf-key storm-id) nimbus))
+ (if (:blob-store nimbus)
+ (while (and (> min-replication-count @current-replication-count-jar)
+ (> min-replication-count @current-replication-count-code)
+ (> min-replication-count @current-replication-count-conf)
(or (= -1 max-replication-wait-time)
(< @total-wait-time max-replication-wait-time)))
(sleep-secs 1)
(log-debug "waiting for desired replication to be achieved.
min-replication-count = " min-replication-count "
max-replication-wait-time = " max-replication-wait-time
- "current-replication-count = " @current-replication-count "
total-wait-time " @total-wait-time)
+ (if (not (local-mode? conf))"current-replication-count for jar
key = " @current-replication-count-jar)
+ "current-replication-count for code key = "
@current-replication-count-code
+ "current-replication-count for conf key = "
@current-replication-count-conf
+ " total-wait-time " @total-wait-time)
(swap! total-wait-time inc)
- (reset! current-replication-count (.getReplicationCount
(:code-distributor nimbus) storm-id))))
- (if (< min-replication-count @current-replication-count)
- (log-message "desired replication count " min-replication-count "
achieved,
- current-replication-count" @current-replication-count)
- (log-message "desired replication count of " min-replication-count "
not achieved but we have hit the max wait time "
- max-replication-wait-time " so moving on with replication count = "
@current-replication-count)
- )))
-
-(defn- read-storm-topology [conf storm-id]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (Utils/deserialize
- (FileUtils/readFileToByteArray
- (File. (master-stormcode-path stormroot))
- ) StormTopology)))
+ (if (not (local-mode? conf))(reset! current-replication-count-conf
(get-blob-replication-count (master-stormconf-key storm-id))))
+ (reset! current-replication-count-code
(get-blob-replication-count (master-stormcode-key storm-id)))
+ (reset! current-replication-count-jar
(get-blob-replication-count (master-stormjar-key storm-id)))))
+ (if (and (< min-replication-count @current-replication-count-conf)
+ (< min-replication-count @current-replication-count-code)
+ (< min-replication-count @current-replication-count-jar))
+ (log-message "desired replication count of " min-replication-count
" not achieved but we have hit the max wait time "
+ max-replication-wait-time " so moving on with replication count for
conf key = " @current-replication-count-conf
+ " for code key = " @current-replication-count-code "for jar key = "
@current-replication-count-jar)
+ (log-message "desired replication count " min-replication-count "
achieved,
+ current-replication-count for conf key "
@current-replication-count-conf ",
+ current-replication-count for code key = "
@current-replication-count-code ",
+ current-replication-count for jar key = "
@current-replication-count-jar))))
+
+(defn- read-storm-topology-as-nimbus [storm-id blob-store]
+ (Utils/deserialize
+ (.readBlob blob-store (master-stormcode-key storm-id)
(get-nimbus-subject)) StormTopology))
(declare compute-executor->component)
+(defn- get-nimbus-subject []
--- End diff --
I think this was already defined some place else. Could we please combine
the two implementations?
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)