Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45385698 --- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java --- @@ -0,0 +1,446 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.blobstore; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.regex.Pattern; + +import javax.security.auth.Subject; + +import backtype.storm.nimbus.NimbusInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.daemon.Shutdownable; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; + +/** + * Provides a way to store blobs that can be downloaded. + * Blobs must be able to be uploaded and listed from Nimbus, + * and downloaded from the Supervisors. It is a key value based + * store. Key being a string and value being the blob data. + * + * ACL checking must take place against the provided subject. + * If the blob store does not support Security it must validate + * that all ACLs set are always WORLD, everything. + * + * The users can upload their blobs through the blob store command + * line. The command line utilty also allows us to update, + * delete. + * + * Modifying the replication factor only works for HdfsBlobStore + * as for the LocalFsBlobStore the replication is dependent on + * the number of Nimbodes available. + */ +public abstract class BlobStore implements Shutdownable { + public static final Logger LOG = LoggerFactory.getLogger(BlobStore.class); + private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$"); + protected static final String BASE_BLOBS_DIR_NAME = "blobs"; + + /** + * Allows us to initialize the blob store + * @param conf The storm configuration + * @param baseDir The directory path to store the blobs + * @param nimbusInfo Contains the nimbus host, port and leadership information. + */ + public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo); + + /** + * Creates the blob. + * @param key Key for the blob. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written into. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + */ + public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException; + + /** + * Updates the blob data. + * @param key Key for the blob. + * @param who Is the subject having the write privilege for the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written into. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Gets the current version of metadata for a blob + * to be viewed by the user or downloaded by the supervisor. + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return AtomicOutputStream returns a stream into which the data + * can be written into. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Sets the metadata with renewed acls for the blob. + * @param key Key for the blob. + * @param meta Metadata which contains the updated + * acls information. + * @param who Is the subject having the write privilege for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Deletes the blob data and metadata. + * @param key Key for the blob. + * @param who Is the subject having write privilege for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Gets the InputStream to read the blob details + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return InputStreamWithMeta has the additional + * file length and version information. + * @throws AuthorizationException + * @throws KeyNotFoundException + */ + public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; + + /** + * Returns an iterator with all the list of + * keys currently available on the blob store. + * @param who Is the subject creating the blob. + * @return Iterator<String> + */ + public abstract Iterator<String> listKeys(Subject who); + + /** + * Gets the replication factor of the blob. + * @param key Key for the blob. + * @param who Is the subject having the read privilege for the blob. + * @return BlobReplication object containing the + * replication factor for the blob. + * @throws Exception + */ + public abstract int getBlobReplication(String key, Subject who) throws Exception; + + /** + * Modifies the replication factor of the blob. + * @param key Key for the blob. + * @param replication The replication factor the + * blob has to be set to. + * @param who Is the subject having the update privilege for the blob + * @return BlobReplication object containing the + * updated replication factor for the blob. + * @throws AuthorizationException + * @throws KeyNotFoundException + * @throws IOException + */ + public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException; + + /** + * Filters keys based on the KeyFilter + * passed as the argument. + * @param filter Filter passed as the + * @param who Might not want to have the subject as it is not doing anything + * @param <R> Type + * @return Set of filtered keys + */ + public <R> Set<R> filterAndListKeys(KeyFilter<R> filter, Subject who) { + Set<R> ret = new HashSet<R>(); + Iterator<String> keys = listKeys(who); + while (keys.hasNext()) { + String key = keys.next(); + R filtered = filter.filter(key); + if (filtered != null) { + ret.add(filtered); + } + } + return ret; + } + + /** + * Validates key checking for potentially harmful patterns + * @param key Key for the blob. + * @throws AuthorizationException + */ + public static final void validateKey(String key) throws AuthorizationException { + if (key == null || key.isEmpty() || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) { + LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN); + throw new AuthorizationException(key+" does not appear to be a valid blob key"); + } + } + + /** + * Wrapper called to create the blob which contains + * the byte data + * @param key Key for the blob. + * @param data Byte data that needs to be uploaded. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + * @throws IOException + */ + public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { + AtomicOutputStream out = null; + try { + out = createBlob(key, meta, who); + out.write(data); + out.close(); + out = null; + } finally { + if (out != null) { + out.cancel(); + } + } + } + + /** + * Wrapper called to create the blob which contains + * the byte data + * @param key Key for the blob. + * @param in InputStream from which the data is read to be + * written as a part of the blob. + * @param meta Metadata which contains the acls information + * @param who Is the subject creating the blob. + * @throws AuthorizationException + * @throws KeyAlreadyExistsException + * @throws IOException + */ + public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException { + AtomicOutputStream out = null; + try { + out = createBlob(key, meta, who); + byte[] buffer = new byte[2048]; + int len = 0; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.close(); + } catch (AuthorizationException | IOException | RuntimeException e) { + out.cancel(); + } finally { + in.close(); + } + } + + /** + * Reads the blob from the blob store + * and writes it into the output stream. + * @param key Key for the blob. + * @param out Output stream + * @param who Is the subject creating + * the blob. + * @throws IOException + * @throws KeyNotFoundException + * @throws AuthorizationException + */ + public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { + InputStreamWithMeta in = getBlob(key, who); + if (in == null) { + throw new IOException("Could not find " + key); + } + byte[] buffer = new byte[2048]; + int len = 0; + try{ + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } finally { + in.close(); + out.flush(); + } + } + + /** + * Wrapper around readBlobTo which + * returns a ByteArray output stream. + * @param key Key for the blob. + * @param who Is the subject creating + * the blob. + * @return ByteArrayOutputStream + * @throws IOException + * @throws KeyNotFoundException + * @throws AuthorizationException + */ + public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + readBlobTo(key, out, who); + return out.toByteArray(); + } + + /** + * Output stream implemetation used for reading the --- End diff -- `implementation`
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---