Github user unsleepy22 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/945#discussion_r47510918
  
    --- Diff: docs/documentation/distcache-blobstore.md ---
    @@ -0,0 +1,733 @@
    +# Storm Distributed Cache API
    +
    +The distributed cache feature in storm is used to efficiently distribute 
files
    +(or blobs, which is the equivalent terminology for a file in the 
distributed
    +cache and is used interchangeably in this document) that are large and can
    +change during the lifetime of a topology, such as geo-location data,
    +dictionaries, etc. Typical use cases include phrase recognition, entity
    +extraction, document classification, URL re-writing, location/address 
detection
    +and so forth. Such files may be several KB to several GB in size. For small
    +datasets that don't need dynamic updates, including them in the topology 
jar
    +could be fine. But for large files, the startup times could become very 
large.
    +In these cases, the distributed cache feature can provide fast topology 
startup,
    +especially if the files were previously downloaded for the same submitter 
and
    +are still in the cache. This is useful with frequent deployments, 
sometimes few
    +times a day with updated jars, because the large cached files will remain 
available
    +without changes. The large cached blobs that do not change frequently will
    +remain available in the distributed cache.
    +
    +At the starting time of a topology, the user specifies the set of files the
    +topology needs. Once a topology is running, the user at any time can 
request for
    +any file in the distributed cache to be updated with a newer version. The
    +updating of blobs happens in an eventual consistency model. If the topology
    +needs to know what version of a file it has access to, it is the 
responsibility
    +of the user to find this information out. The files are stored in a cache 
with
    +Least-Recently Used (LRU) eviction policy, where the supervisor decides 
which
    +cached files are no longer needed and can delete them to free disk space. 
The
    +blobs can be compressed, and the user can request the blobs to be 
uncompressed
    +before it accesses them.
    +
    +## Motivation for Distributed Cache
    +* Allows sharing blobs among topologies.
    +* Allows updating the blobs from the command line.
    +
    +## Distributed Cache Implementations
    +The current BlobStore interface has the following two implementations
    +* LocalFsBlobStore
    +* HdfsBlobStore
    +
    +Appendix A contains the interface for blob store implementation.
    +
    +## LocalFsBlobStore
    +![LocalFsBlobStore](images/local_blobstore.png)
    +
    +Local file system implementation of Blobstore can be depicted in the above 
timeline diagram.
    +
    +There are several stages from blob creation to blob download and 
corresponding execution of a topology. 
    +The main stages can be depicted as follows
    +
    +### Blob Creation Command
    +Blobs in the blobstore can be created through command line using the 
following command.
    +storm blobstore create --file README.txt --acl o::rwa --repl-fctr 4 key1
    +The above command creates a blob with a key name “key1” corresponding 
to the file README.txt. 
    +The access given to all users being read, write and admin with a 
replication factor of 4.
    +
    +### Topology Submission and Blob Mapping
    +Users can submit their topology with the following command. The command 
includes the 
    +topology map configuration. The configuration holds two keys “key1” 
and “key2” with the 
    +key “key1” having a local file name mapping named “blob_file” and 
it is not compressed.
    +
    +```
    +storm jar 
/home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar 
    +storm.starter.clj.word_count test_topo -c 
topology.blobstore.map='{"key1":{"localname":"blob_file", 
"uncompress":"false"},"key2":{}}'
    +```
    +
    +### Blob Creation Process
    +The creation of the blob takes place through the interface 
“ClientBlobStore”. Appendix B contains the “ClientBlobStore” interface. 
    +The concrete implementation of this interface is the  
“NimbusBlobStore”. In the case of local file system the client makes a 
    +call to the nimbus to create the blobs within the local file system. The 
nimbus uses the local file system implementation to create these blobs. 
    +When a user submits a topology, the jar, configuration and code files are 
uploaded as blobs with the help of blob store. 
    +Also, all the other blobs specified by the topology are mapped to it with 
the help of topology.blobstore.map configuration.
    +
    +### Blob Download by the Supervisor
    +Finally, the blobs corresponding to a topology are downloaded by the 
supervisor once it receives the assignments from the nimbus through 
    +the same “NimbusBlobStore” thrift client that uploaded the blobs. The 
supervisor downloads the code, jar and conf blobs by calling the 
    +“NimbusBlobStore” client directly while the blobs specified in the 
topology.blobstore.map are downloaded and mapped locally with the help 
    +of the Localizer. The Localizer talks to the “NimbusBlobStore” thrift 
client to download the blobs and adds the blob compression and local 
    +blob name mapping logic to suit the implementation of a topology. Once all 
the blobs have been downloaded the workers are launched to run 
    +the topologies.
    +
    +## HdfsBlobStore
    +![HdfsBlobStore](images/hdfs_blobstore.png)
    +
    +The HdfsBlobStore functionality has a similar implementation and blob 
creation and download procedure barring how the replication 
    +is handled in the two blob store implementations. The replication in HDFS 
blob store is obvious as HDFS is equipped to handle replication 
    +and it requires no state to be stored inside the zookeeper. On the other 
hand, the local file system blobstore requires the state to be 
    +stored on the zookeeper in order for it to work with nimbus HA. Nimbus HA 
allows the local filesystem to implement the replication feature 
    +seamlessly by storing the state in the zookeeper about the running 
topologies and syncing the blobs on various nimbodes. On the supervisor’s 
    +end, the supervisor and localizer talks to HdfsBlobStore through 
“HdfsClientBlobStore” implementation.
    +
    +## Additional Features and Documentation
    +```
    +storm jar 
/home/y/lib/storm-starter/current/storm-starter-jar-with-dependencies.jar 
storm.starter.clj.word_count test_topo 
    +-c topology.blobstore.map='{"key1":{"localname":"blob_file", 
"uncompress":"false"},"key2":{}}'
    +```
    + 
    +### Compression
    +The blob store allows the user to specify the “uncompress” 
configuration to true or false. This configuration can be specified 
    +in the topology.blobstore.map mentioned in the above command. This allows 
the user to upload a compressed file like a tarball/zip. 
    +In local file system blob store, the compressed blobs are stored on the 
nimbus node. The localizer code takes the responsibility to 
    +uncompress the blob and store it on the supervisor node. Symbolic links to 
the blobs on the supervisor node are created within the worker 
    +before the execution starts.
    +
    +### Local File Name Mapping
    +Apart from compression the blobstore helps to give the blob a name that 
can be used by the workers. The localizer takes 
    +the responsibility of mapping the blob to a local name on the supervisor 
node.
    +
    +## Additional Blob Store Implementation Details
    +Blob store uses a hashing function to create the blobs based on the key. 
The blobs are generally stored inside the directory specified by 
    +the blobstore.dir configuration. By default, it is stored under 
“storm.local.dir/nimbus/blobs” for local file system and a similar path on 
    +hdfs file system.
    +
    +Once a file is submitted, the blob store reads the configs and creates a 
metadata for the blob with all the access control details. The metadata 
    +is generally used for authorization while accessing the blobs. The blob 
key and version contribute to the hash code and there by the directory 
    +under “storm.local.dir/nimbus/blobs/data” where the data is placed. 
The blobs are generally placed in a positive number directory like 193,822 etc.
    +
    +Once the topology is launched and the relevant blobs have been created the 
supervisor downloads blobs related to the storm.conf, storm.ser 
    +and storm.code first and all the blobs uploaded by the command line 
separately using the localizer to uncompress and map them to a local name 
    +specified in the topology.blobstore.map configuration. The supervisor 
periodically updates blobs by checking for the change of version. 
    +This allows updating the blobs on the fly and thereby making it a very 
useful feature.
    +
    +For a local file system, the distributed cache on the supervisor node is 
set to 10240 MB as a soft limit and the clean up code attempts 
    +to clean anything over the soft limit every 600 seconds based on LRU 
policy.
    +
    +The HDFS blob store implementation handles load better by removing the 
burden on the nimbus to store the blobs, which avoids it becoming a bottleneck. 
Moreover, it provides seamless replication of blobs. On the other hand, the 
local file system blob store is not very efficient in 
    +replicating the blobs and is limited by the number of nimbuses. Moreover, 
the supervisor talks to the HDFS blob store directly without the 
    +involvement of the nimbus and thereby reduces the load and dependency on 
nimbus.
    +
    +## Highly Available Nimbus
    +### Problem Statement:
    +Currently the storm master aka nimbus, is a process that runs on a single 
machine under supervision. In most cases the 
    +nimbus failure is transient and it is restarted by the supervisor. However 
sometimes when disks fail and networks 
    +partitions occur, nimbus goes down. Under these circumstances the 
topologies run normally but no new topologies can be 
    +submitted, no existing topologies can be killed/deactivated/activated and 
if a supervisor node fails then the 
    +reassignments are not performed resulting in performance degradation or 
topology failures. With this project we intend 
    +to resolve this problem by running nimbus in a primary backup mode to 
guarantee that even if a nimbus server fails one 
    +of the backups will take over. 
    +
    +### Requirements for Highly Available Nimbus:
    +* Increase overall availability of nimbus.
    +* Allow nimbus hosts to leave and join the cluster at will any time. A 
newly joined host should auto catch up and join 
    +the list of potential leaders automatically. 
    +* No topology resubmissions required in case of nimbus fail overs.
    +* No active topology should ever be lost. 
    +
    +#### Leader Election:
    +The nimbus server will use the following interface:
    +
    +```java
    +public interface ILeaderElector {
    +    /**
    +     * queue up for leadership lock. The call returns immediately and the 
caller                     
    +     * must check isLeader() to perform any leadership action.
    +     */
    +    void addToLeaderLockQueue();
    +
    +    /**
    +     * Removes the caller from the leader lock queue. If the caller is 
leader
    +     * also releases the lock.
    +     */
    +    void removeFromLeaderLockQueue();
    +
    +    /**
    +     *
    +     * @return true if the caller currently has the leader lock.
    +     */
    +    boolean isLeader();
    +
    +    /**
    +     *
    +     * @return the current leader's address , throws exception if noone 
has has    lock.
    +     */
    +    InetSocketAddress getLeaderAddress();
    +
    +    /**
    +     * 
    +     * @return list of current nimbus addresses, includes leader.
    +     */
    +    List<InetSocketAddress> getAllNimbusAddresses();
    +}
    +```
    +Once a nimbus comes up it calls addToLeaderLockQueue() function. The 
leader election code selects a leader from the queue.
    +If the topology code, jar or config blobs are missing, it would download 
the blobs from any other  
    +
    +The first implementation will be Zookeeper based. If the zookeeper 
connection is lost/resetted resulting in loss of lock
    --- End diff --
    
    lost/resetted -> lost/reset


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to