[ 
https://issues.apache.org/jira/browse/STORM-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15056116#comment-15056116
 ] 

ASF GitHub Bot commented on STORM-1372:
---------------------------------------

Github user unsleepy22 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/945#discussion_r47510918
  
    --- Diff: docs/documentation/distcache-blobstore.md ---
    @@ -0,0 +1,733 @@
    +# Storm Distributed Cache API
    +
    +The distributed cache feature in storm is used to efficiently distribute 
files
    +(or blobs, which is the equivalent terminology for a file in the 
distributed
    +cache and is used interchangeably in this document) that are large and can
    +change during the lifetime of a topology, such as geo-location data,
    +dictionaries, etc. Typical use cases include phrase recognition, entity
    +extraction, document classification, URL re-writing, location/address 
detection
    +and so forth. Such files may be several KB to several GB in size. For small
    +datasets that don't need dynamic updates, including them in the topology 
jar
    +could be fine. But for large files, the startup times could become very 
large.
    +In these cases, the distributed cache feature can provide fast topology 
startup,
    +especially if the files were previously downloaded for the same submitter 
and
    +are still in the cache. This is useful with frequent deployments, 
sometimes few
    +times a day with updated jars, because the large cached files will remain 
available
    +without changes. The large cached blobs that do not change frequently will
    +remain available in the distributed cache.
    +
    +At the starting time of a topology, the user specifies the set of files the
    +topology needs. Once a topology is running, the user at any time can 
request for
    +any file in the distributed cache to be updated with a newer version. The
    +updating of blobs happens in an eventual consistency model. If the topology
    +needs to know what version of a file it has access to, it is the 
responsibility
    +of the user to find this information out. The files are stored in a cache 
with
    +Least-Recently Used (LRU) eviction policy, where the supervisor decides 
which
    +cached files are no longer needed and can delete them to free disk space. 
The
    +blobs can be compressed, and the user can request the blobs to be 
uncompressed
    +before it accesses them.
    +
    +## Motivation for Distributed Cache
    +* Allows sharing blobs among topologies.
    +* Allows updating the blobs from the command line.
    +
    +## Distributed Cache Implementations
    +The current BlobStore interface has the following two implementations
    +* LocalFsBlobStore
    +* HdfsBlobStore
    +
    +Appendix A contains the interface for blob store implementation.
    +
    +## LocalFsBlobStore
    +![LocalFsBlobStore](images/local_blobstore.png)
    +
    +Local file system implementation of Blobstore can be depicted in the above 
timeline diagram.
    +
    +There are several stages from blob creation to blob download and 
corresponding execution of a topology. 
    +The main stages can be depicted as follows
    +
    +### Blob Creation Command
    +Blobs in the blobstore can be created through command line using the 
following command.
    +storm blobstore create --file README.txt --acl o::rwa --repl-fctr 4 key1
    +The above command creates a blob with a key name “key1” corresponding to 
the file README.txt. 
    +The access given to all users being read, write and admin with a 
replication factor of 4.
    +
    +### Topology Submission and Blob Mapping
    +Users can submit their topology with the following command. The command 
includes the 
    +topology map configuration. The configuration holds two keys “key1” and 
“key2” with the 
    +key “key1” having a local file name mapping named “blob_file” and it is 
not compressed.
    +
    +```
    +storm jar 
/home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar 
    +storm.starter.clj.word_count test_topo -c 
topology.blobstore.map='{"key1":{"localname":"blob_file", 
"uncompress":"false"},"key2":{}}'
    +```
    +
    +### Blob Creation Process
    +The creation of the blob takes place through the interface 
“ClientBlobStore”. Appendix B contains the “ClientBlobStore” interface. 
    +The concrete implementation of this interface is the  “NimbusBlobStore”. 
In the case of local file system the client makes a 
    +call to the nimbus to create the blobs within the local file system. The 
nimbus uses the local file system implementation to create these blobs. 
    +When a user submits a topology, the jar, configuration and code files are 
uploaded as blobs with the help of blob store. 
    +Also, all the other blobs specified by the topology are mapped to it with 
the help of topology.blobstore.map configuration.
    +
    +### Blob Download by the Supervisor
    +Finally, the blobs corresponding to a topology are downloaded by the 
supervisor once it receives the assignments from the nimbus through 
    +the same “NimbusBlobStore” thrift client that uploaded the blobs. The 
supervisor downloads the code, jar and conf blobs by calling the 
    +“NimbusBlobStore” client directly while the blobs specified in the 
topology.blobstore.map are downloaded and mapped locally with the help 
    +of the Localizer. The Localizer talks to the “NimbusBlobStore” thrift 
client to download the blobs and adds the blob compression and local 
    +blob name mapping logic to suit the implementation of a topology. Once all 
the blobs have been downloaded the workers are launched to run 
    +the topologies.
    +
    +## HdfsBlobStore
    +![HdfsBlobStore](images/hdfs_blobstore.png)
    +
    +The HdfsBlobStore functionality has a similar implementation and blob 
creation and download procedure barring how the replication 
    +is handled in the two blob store implementations. The replication in HDFS 
blob store is obvious as HDFS is equipped to handle replication 
    +and it requires no state to be stored inside the zookeeper. On the other 
hand, the local file system blobstore requires the state to be 
    +stored on the zookeeper in order for it to work with nimbus HA. Nimbus HA 
allows the local filesystem to implement the replication feature 
    +seamlessly by storing the state in the zookeeper about the running 
topologies and syncing the blobs on various nimbodes. On the supervisor’s 
    +end, the supervisor and localizer talks to HdfsBlobStore through 
“HdfsClientBlobStore” implementation.
    +
    +## Additional Features and Documentation
    +```
    +storm jar 
/home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar 
storm.starter.clj.word_count test_topo 
    +-c topology.blobstore.map='{"key1":{"localname":"blob_file", 
"uncompress":"false"},"key2":{}}'
    +```
    + 
    +### Compression
    +The blob store allows the user to specify the “uncompress” configuration 
to true or false. This configuration can be specified 
    +in the topology.blobstore.map mentioned in the above command. This allows 
the user to upload a compressed file like a tarball/zip. 
    +In local file system blob store, the compressed blobs are stored on the 
nimbus node. The localizer code takes the responsibility to 
    +uncompress the blob and store it on the supervisor node. Symbolic links to 
the blobs on the supervisor node are created within the worker 
    +before the execution starts.
    +
    +### Local File Name Mapping
    +Apart from compression the blobstore helps to give the blob a name that 
can be used by the workers. The localizer takes 
    +the responsibility of mapping the blob to a local name on the supervisor 
node.
    +
    +## Additional Blob Store Implementation Details
    +Blob store uses a hashing function to create the blobs based on the key. 
The blobs are generally stored inside the directory specified by 
    +the blobstore.dir configuration. By default, it is stored under 
“storm.local.dir/nimbus/blobs” for local file system and a similar path on 
    +hdfs file system.
    +
    +Once a file is submitted, the blob store reads the configs and creates a 
metadata for the blob with all the access control details. The metadata 
    +is generally used for authorization while accessing the blobs. The blob 
key and version contribute to the hash code and there by the directory 
    +under “storm.local.dir/nimbus/blobs/data” where the data is placed. The 
blobs are generally placed in a positive number directory like 193,822 etc.
    +
    +Once the topology is launched and the relevant blobs have been created the 
supervisor downloads blobs related to the storm.conf, storm.ser 
    +and storm.code first and all the blobs uploaded by the command line 
separately using the localizer to uncompress and map them to a local name 
    +specified in the topology.blobstore.map configuration. The supervisor 
periodically updates blobs by checking for the change of version. 
    +This allows updating the blobs on the fly and thereby making it a very 
useful feature.
    +
    +For a local file system, the distributed cache on the supervisor node is 
set to 10240 MB as a soft limit and the clean up code attempts 
    +to clean anything over the soft limit every 600 seconds based on LRU 
policy.
    +
    +The HDFS blob store implementation handles load better by removing the 
burden on the nimbus to store the blobs, which avoids it becoming a bottleneck. 
Moreover, it provides seamless replication of blobs. On the other hand, the 
local file system blob store is not very efficient in 
    +replicating the blobs and is limited by the number of nimbuses. Moreover, 
the supervisor talks to the HDFS blob store directly without the 
    +involvement of the nimbus and thereby reduces the load and dependency on 
nimbus.
    +
    +## Highly Available Nimbus
    +### Problem Statement:
    +Currently the storm master aka nimbus, is a process that runs on a single 
machine under supervision. In most cases the 
    +nimbus failure is transient and it is restarted by the supervisor. However 
sometimes when disks fail and networks 
    +partitions occur, nimbus goes down. Under these circumstances the 
topologies run normally but no new topologies can be 
    +submitted, no existing topologies can be killed/deactivated/activated and 
if a supervisor node fails then the 
    +reassignments are not performed resulting in performance degradation or 
topology failures. With this project we intend 
    +to resolve this problem by running nimbus in a primary backup mode to 
guarantee that even if a nimbus server fails one 
    +of the backups will take over. 
    +
    +### Requirements for Highly Available Nimbus:
    +* Increase overall availability of nimbus.
    +* Allow nimbus hosts to leave and join the cluster at will any time. A 
newly joined host should auto catch up and join 
    +the list of potential leaders automatically. 
    +* No topology resubmissions required in case of nimbus fail overs.
    +* No active topology should ever be lost. 
    +
    +#### Leader Election:
    +The nimbus server will use the following interface:
    +
    +```java
    +public interface ILeaderElector {
    +    /**
    +     * queue up for leadership lock. The call returns immediately and the 
caller                     
    +     * must check isLeader() to perform any leadership action.
    +     */
    +    void addToLeaderLockQueue();
    +
    +    /**
    +     * Removes the caller from the leader lock queue. If the caller is 
leader
    +     * also releases the lock.
    +     */
    +    void removeFromLeaderLockQueue();
    +
    +    /**
    +     *
    +     * @return true if the caller currently has the leader lock.
    +     */
    +    boolean isLeader();
    +
    +    /**
    +     *
    +     * @return the current leader's address , throws exception if noone 
has has    lock.
    +     */
    +    InetSocketAddress getLeaderAddress();
    +
    +    /**
    +     * 
    +     * @return list of current nimbus addresses, includes leader.
    +     */
    +    List<InetSocketAddress> getAllNimbusAddresses();
    +}
    +```
    +Once a nimbus comes up it calls addToLeaderLockQueue() function. The 
leader election code selects a leader from the queue.
    +If the topology code, jar or config blobs are missing, it would download 
the blobs from any other  
    +
    +The first implementation will be Zookeeper based. If the zookeeper 
connection is lost/resetted resulting in loss of lock
    --- End diff --
    
    lost/resetted -> lost/reset


> Update BlobStore Documentation - Follow up STORM-876
> ----------------------------------------------------
>
>                 Key: STORM-1372
>                 URL: https://issues.apache.org/jira/browse/STORM-1372
>             Project: Apache Storm
>          Issue Type: Story
>            Reporter: Sanket Reddy
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to