[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014182#comment-15014182
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45385698
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line utilty also allows us to update,
+ * delete.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+ public static final Logger LOG =
LoggerFactory.getLogger(BlobStore.class);
+ private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w
\\t\\.:_-]+$");
+ protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+ /**
+ * Allows us to initialize the blob store
+ * @param conf The storm configuration
+ * @param baseDir The directory path to store the blobs
+ * @param nimbusInfo Contains the nimbus host, port and leadership
information.
+ */
+ public abstract void prepare(Map conf, String baseDir, NimbusInfo
nimbusInfo);
+
+ /**
+ * Creates the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written into.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ */
+ public abstract AtomicOutputStream createBlob(String key,
SettableBlobMeta meta, Subject who) throws AuthorizationException,
KeyAlreadyExistsException;
+
+ /**
+ * Updates the blob data.
+ * @param key Key for the blob.
+ * @param who Is the subject having the write privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written into.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract AtomicOutputStream updateBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the current version of metadata for a blob
+ * to be viewed by the user or downloaded by the supervisor.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written into.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract ReadableBlobMeta getBlobMeta(String key, Subject who)
throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Sets the metadata with renewed acls for the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the updated
+ * acls information.
+ * @param who Is the subject having the write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void setBlobMeta(String key, SettableBlobMeta meta,
Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Deletes the blob data and metadata.
+ * @param key Key for the blob.
+ * @param who Is the subject having write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void deleteBlob(String key, Subject who) throws
AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the InputStream to read the blob details
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return InputStreamWithMeta has the additional
+ * file length and version information.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract InputStreamWithMeta getBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Returns an iterator with all the list of
+ * keys currently available on the blob store.
+ * @param who Is the subject creating the blob.
+ * @return Iterator<String>
+ */
+ public abstract Iterator<String> listKeys(Subject who);
+
+ /**
+ * Gets the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return BlobReplication object containing the
+ * replication factor for the blob.
+ * @throws Exception
+ */
+ public abstract int getBlobReplication(String key, Subject who) throws
Exception;
+
+ /**
+ * Modifies the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param replication The replication factor the
+ * blob has to be set to.
+ * @param who Is the subject having the update privilege for the blob
+ * @return BlobReplication object containing the
+ * updated replication factor for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ * @throws IOException
+ */
+ public abstract int updateBlobReplication(String key, int replication,
Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
+
+ /**
+ * Filters keys based on the KeyFilter
+ * passed as the argument.
+ * @param filter Filter passed as the
+ * @param who Might not want to have the subject as it is not doing
anything
+ * @param <R> Type
+ * @return Set of filtered keys
+ */
+ public <R> Set<R> filterAndListKeys(KeyFilter<R> filter, Subject who) {
+ Set<R> ret = new HashSet<R>();
+ Iterator<String> keys = listKeys(who);
+ while (keys.hasNext()) {
+ String key = keys.next();
+ R filtered = filter.filter(key);
+ if (filtered != null) {
+ ret.add(filtered);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Validates key checking for potentially harmful patterns
+ * @param key Key for the blob.
+ * @throws AuthorizationException
+ */
+ public static final void validateKey(String key) throws
AuthorizationException {
+ if (key == null || key.isEmpty() || "..".equals(key) ||
".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
+ LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
+ throw new AuthorizationException(key+" does not appear to be a valid
blob key");
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param data Byte data that needs to be uploaded.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, byte [] data, SettableBlobMeta meta,
Subject who) throws AuthorizationException, KeyAlreadyExistsException,
IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ out.write(data);
+ out.close();
+ out = null;
+ } finally {
+ if (out != null) {
+ out.cancel();
+ }
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param in InputStream from which the data is read to be
+ * written as a part of the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, InputStream in, SettableBlobMeta
meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException,
IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ } catch (AuthorizationException | IOException | RuntimeException e) {
+ out.cancel();
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Reads the blob from the blob store
+ * and writes it into the output stream.
+ * @param key Key for the blob.
+ * @param out Output stream
+ * @param who Is the subject creating
+ * the blob.
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public void readBlobTo(String key, OutputStream out, Subject who) throws
IOException, KeyNotFoundException, AuthorizationException {
+ InputStreamWithMeta in = getBlob(key, who);
+ if (in == null) {
+ throw new IOException("Could not find " + key);
+ }
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ try{
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ in.close();
+ out.flush();
+ }
+ }
+
+ /**
+ * Wrapper around readBlobTo which
+ * returns a ByteArray output stream.
+ * @param key Key for the blob.
+ * @param who Is the subject creating
+ * the blob.
+ * @return ByteArrayOutputStream
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public byte[] readBlob(String key, Subject who) throws IOException,
KeyNotFoundException, AuthorizationException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ readBlobTo(key, out, who);
+ return out.toByteArray();
+ }
+
+ /**
+ * Output stream implemetation used for reading the
--- End diff --
`implementation`
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)