[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014155#comment-15014155
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45384344
  
    --- Diff: storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java ---
    @@ -0,0 +1,446 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.blobstore;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import javax.security.auth.Subject;
    +
    +import backtype.storm.nimbus.NimbusInfo;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.daemon.Shutdownable;
    +import backtype.storm.generated.AuthorizationException;
    +import backtype.storm.generated.KeyNotFoundException;
    +import backtype.storm.generated.KeyAlreadyExistsException;
    +import backtype.storm.generated.ReadableBlobMeta;
    +import backtype.storm.generated.SettableBlobMeta;
    +
    +/**
    + * Provides a way to store blobs that can be downloaded.
    + * Blobs must be able to be uploaded and listed from Nimbus,
    + * and downloaded from the Supervisors. It is a key value based
    + * store. Key being a string and value being the blob data.
    + *
    + * ACL checking must take place against the provided subject.
    + * If the blob store does not support Security it must validate
    + * that all ACLs set are always WORLD, everything.
    + *
    + * The users can upload their blobs through the blob store command
    + * line. The command line utilty also allows us to update,
    + * delete.
    + *
    + * Modifying the replication factor only works for HdfsBlobStore
    + * as for the LocalFsBlobStore the replication is dependent on
    + * the number of Nimbodes available.
    + */
    +public abstract class BlobStore implements Shutdownable {
    +  public static final Logger LOG = 
LoggerFactory.getLogger(BlobStore.class);
    +  private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w 
\\t\\.:_-]+$");
    +  protected static final String BASE_BLOBS_DIR_NAME = "blobs";
    +
    +  /**
    +   * Allows us to initialize the blob store
    +   * @param conf The storm configuration
    +   * @param baseDir The directory path to store the blobs
    +   * @param nimbusInfo Contains the nimbus host, port and leadership 
information.
    +   */
    +  public abstract void prepare(Map conf, String baseDir, NimbusInfo 
nimbusInfo);
    +
    +  /**
    +   * Creates the blob.
    +   * @param key Key for the blob.
    +   * @param meta Metadata which contains the acls information
    +   * @param who Is the subject creating the blob.
    +   * @return AtomicOutputStream returns a stream into which the data
    +   * can be written into.
    +   * @throws AuthorizationException
    +   * @throws KeyAlreadyExistsException
    +   */
    +  public abstract AtomicOutputStream createBlob(String key, 
SettableBlobMeta meta, Subject who) throws AuthorizationException, 
KeyAlreadyExistsException;
    +
    +  /**
    +   * Updates the blob data.
    +   * @param key Key for the blob.
    +   * @param who Is the subject having the write privilege for the blob.
    +   * @return AtomicOutputStream returns a stream into which the data
    +   * can be written into.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   */
    +  public abstract AtomicOutputStream updateBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
    +
    +  /**
    +   * Gets the current version of metadata for a blob
    +   * to be viewed by the user or downloaded by the supervisor.
    +   * @param key Key for the blob.
    +   * @param who Is the subject having the read privilege for the blob.
    +   * @return AtomicOutputStream returns a stream into which the data
    +   * can be written into.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   */
    +  public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
    +
    +  /**
    +   * Sets the metadata with renewed acls for the blob.
    +   * @param key Key for the blob.
    +   * @param meta Metadata which contains the updated
    +   * acls information.
    +   * @param who Is the subject having the write privilege for the blob.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   */
    +  public abstract void setBlobMeta(String key, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyNotFoundException;
    +
    +  /**
    +   * Deletes the blob data and metadata.
    +   * @param key Key for the blob.
    +   * @param who Is the subject having write privilege for the blob.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   */
    +  public abstract void deleteBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException;
    +
    +  /**
    +   * Gets the InputStream to read the blob details
    +   * @param key Key for the blob.
    +   * @param who Is the subject having the read privilege for the blob.
    +   * @return InputStreamWithMeta has the additional
    +   * file length and version information.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   */
    +  public abstract InputStreamWithMeta getBlob(String key, Subject who) 
throws AuthorizationException, KeyNotFoundException;
    +
    +  /**
    +   * Returns an iterator with all the list of
    +   * keys currently available on the blob store.
    +   * @param who Is the subject creating the blob.
    +   * @return Iterator<String>
    +   */
    +  public abstract Iterator<String> listKeys(Subject who);
    +
    +  /**
    +   * Gets the replication factor of the blob.
    +   * @param key Key for the blob.
    +   * @param who Is the subject having the read privilege for the blob.
    +   * @return BlobReplication object containing the
    +   * replication factor for the blob.
    +   * @throws Exception
    +   */
    +  public abstract int getBlobReplication(String key, Subject who) throws 
Exception;
    +
    +  /**
    +   * Modifies the replication factor of the blob.
    +   * @param key Key for the blob.
    +   * @param replication The replication factor the
    +   * blob has to be set to.
    +   * @param who Is the subject having the update privilege for the blob
    +   * @return BlobReplication object containing the
    +   * updated replication factor for the blob.
    +   * @throws AuthorizationException
    +   * @throws KeyNotFoundException
    +   * @throws IOException
    +   */
    +  public abstract int updateBlobReplication(String key, int replication, 
Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
    +
    +  /**
    +   * Filters keys based on the KeyFilter
    +   * passed as the argument.
    +   * @param filter Filter passed as the
    +   * @param who Might not want to have the subject as it is not doing 
anything
    +   * @param <R> Type
    +   * @return Set of filtered keys
    +   */
    +  public <R> Set<R> filterAndListKeys(KeyFilter<R> filter, Subject who) {
    +    Set<R> ret = new HashSet<R>();
    +    Iterator<String> keys = listKeys(who);
    +    while (keys.hasNext()) {
    +      String key = keys.next();
    +      R filtered = filter.filter(key);
    +      if (filtered != null) {
    +        ret.add(filtered);
    +      }
    +    }
    +    return ret;
    +  }
    +
    +  /**
    +   * Validates key checking for potentially harmful patterns
    +   * @param key Key for the blob.
    +   * @throws AuthorizationException
    +   */
    +  public static final void validateKey(String key) throws 
AuthorizationException {
    +    if (key == null || key.isEmpty() || "..".equals(key) || 
".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
    +      LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
    +      throw new AuthorizationException(key+" does not appear to be a valid 
blob key");
    +    }
    +  }
    +
    +  /**
    +   * Wrapper called to create the blob which contains
    +   * the byte data
    +   * @param key Key for the blob.
    +   * @param data Byte data that needs to be uploaded.
    +   * @param meta Metadata which contains the acls information
    +   * @param who Is the subject creating the blob.
    +   * @throws AuthorizationException
    +   * @throws KeyAlreadyExistsException
    +   * @throws IOException
    +   */
    +  public void createBlob(String key, byte [] data, SettableBlobMeta meta, 
Subject who) throws AuthorizationException, KeyAlreadyExistsException, 
IOException {
    +    AtomicOutputStream out = null;
    +    try {
    +      out = createBlob(key, meta, who);
    +      out.write(data);
    +      out.close();
    +      out = null;
    +    } finally {
    +      if (out != null) {
    +        out.cancel();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Wrapper called to create the blob which contains
    +   * the byte data
    +   * @param key Key for the blob.
    +   * @param in InputStream from which the data is read to be
    +   * written as a part of the blob.
    +   * @param meta Metadata which contains the acls information
    +   * @param who Is the subject creating the blob.
    +   * @throws AuthorizationException
    +   * @throws KeyAlreadyExistsException
    +   * @throws IOException
    +   */
    +  public void createBlob(String key, InputStream in, SettableBlobMeta 
meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, 
IOException {
    +    AtomicOutputStream out = null;
    +    try {
    +      out = createBlob(key, meta, who);
    +      byte[] buffer = new byte[2048];
    +      int len = 0;
    +      while ((len = in.read(buffer)) > 0) {
    +        out.write(buffer, 0, len);
    +      }
    +      out.close();
    +    } catch (AuthorizationException | IOException | RuntimeException e) {
    +      out.cancel();
    --- End diff --
    
    If we catch `AuthorizationException` from the above call to `createBlob`, 
then `out` will be null here.  So we need to check that `out` is not null here 
before calling `cancel`.


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to