[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989952#comment-14989952
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r43909653
--- Diff: storm-core/src/clj/backtype/storm/command/blobstore.clj ---
@@ -0,0 +1,163 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.blobstore
+ (:import [java.io InputStream OutputStream])
+ (:use [backtype.storm config])
+ (:import [backtype.storm.generated SettableBlobMeta AccessControl
AuthorizationException
+ KeyNotFoundException])
+ (:import [backtype.storm.blobstore BlobStoreAclHandler])
+ (:use [clojure.string :only [split]])
+ (:use [clojure.tools.cli :only [cli]])
+ (:use [clojure.java.io :only [copy input-stream output-stream]])
+ (:use [backtype.storm blobstore log util])
+ (:gen-class))
+
+(defn update-blob-from-stream
+ "Update a blob in the blob store from an InputStream"
+ [key ^InputStream in]
+ (with-configured-blob-client blobstore
+ (let [out (.updateBlob blobstore key)]
+ (try
+ (copy in out)
+ (.close out)
+ (catch Exception e
+ (log-message e)
+ (.cancel out)
+ (throw e))))))
+
+(defn create-blob-from-stream
+ "Create a blob in the blob store from an InputStream"
+ [key ^InputStream in ^SettableBlobMeta meta]
+ (with-configured-blob-client blobstore
+ (let [out (.createBlob blobstore key meta)]
+ (try
+ (copy in out)
+ (.close out)
+ (catch Exception e
+ (.cancel out)
+ (throw e))))))
+
+(defn read-blob
+ "Read a blob in the blob store and write to an OutputStream"
+ [key ^OutputStream out]
+ (with-configured-blob-client blobstore
+ (with-open [in (.getBlob blobstore key)]
+ (copy in out))))
+
+(defn as-access-control
+ "Convert a parameter to an AccessControl object"
+ [param]
+ (BlobStoreAclHandler/parseAccessControl (str param)))
+
+(defn as-acl
+ [param]
+ (map as-access-control (split param #",")))
+
+(defn access-control-str
+ [^AccessControl acl]
+ (BlobStoreAclHandler/accessControlToString acl))
+
+(defn read-cli [args]
+ (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+ (if file
+ (with-open [f (output-stream file)]
+ (read-blob key f))
+ (read-blob key System/out))))
+
+(defn update-cli [args]
+ (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
+ (if file
+ (with-open [f (input-stream file)]
+ (update-blob-from-stream key f))
+ (update-blob-from-stream key System/in))
+ (log-message "Successfully updated " key)))
+
+(defn create-cli [args]
+ (let [[{file :file acl :acl replication-factor :replication-fctr} [key]
_] (cli args ["-f" "--file" :default nil]
+ ["-a" "--acl" :default
[] :parse-fn as-acl]
+ ["-r"
"--replication-factor" :default -1 :parse-fn parse-int])
+ meta (doto (SettableBlobMeta. acl)
+ (.set_replication_factor replication-factor))]
+ (log-message "Creating " key " with ACL " (pr-str (map
access-control-str acl)))
+ (if file
+ (with-open [f (input-stream file)]
+ (create-blob-from-stream key f meta))
+ (create-blob-from-stream key System/in meta))
+ (log-message "Successfully created " key)))
+
+(defn delete-cli [args]
+ (with-configured-blob-client blobstore
+ (doseq [key args]
+ (.deleteBlob blobstore key)
+ (log-message "deleted " key))))
+
+(defn list-cli [args]
+ (with-configured-blob-client blobstore
+ (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore))
args)]
+ (doseq [key keys]
+ (try
+ (let [meta (.getBlobMeta blobstore key)
+ version (.get_version meta)
+ acl (.get_acl (.get_settable meta))]
+ (log-message key " " version " " (pr-str (map
access-control-str acl))))
+ (catch AuthorizationException ae
+ (if-not (empty? args) (log-message "ACCESS DENIED to key: "
key)))
+ (catch KeyNotFoundException knf
+ (if-not (empty? args) (log-message key " NOT FOUND"))))))))
--- End diff --
It would be nice to log the two messages inside the `catch` forms to STDERR
instead of STDOUT.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)