[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15014945#comment-15014945 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45425302 --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java --- @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.blobstore; + +import backtype.storm.Config; +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.BlobStore; +import backtype.storm.blobstore.BlobStoreAclHandler; +import backtype.storm.blobstore.BlobStoreFile; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.ReadableBlobMeta; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.nimbus.NimbusInfo; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift7.TBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import java.util.Map; + +import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN; +import static backtype.storm.blobstore.BlobStoreAclHandler.READ; +import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE; + +/** + * Provides a HDFS file system backed blob store implementation. + * Note that this provides an api for having HDFS be the backing store for the blobstore, + * it is not a service/daemon. + */ +public class HdfsBlobStore extends BlobStore { + public static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStore.class); + private static final String DATA_PREFIX = "data_"; + private static final String META_PREFIX = "meta_"; + private BlobStoreAclHandler _aclHandler; + private HdfsBlobStoreImpl _hbs; + private Subject _localSubject; + private Map conf; + + /** + * Get the subject from Hadoop so we can use it to validate the acls. There is no direct + * interface from UserGroupInformation to get the subject, so do a doAs and get the context. + * We could probably run everything in the doAs but for now just grab the subject. + */ + private Subject getHadoopUser() { + Subject subj; + try { + subj = UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction<Subject>() { + @Override + public Subject run() { + return Subject.getSubject(AccessController.getContext()); + } + }); + } catch (IOException e) { + throw new RuntimeException("Error creating subject and logging user in!", e); + } + return subj; + } + + /** + * If who is null then we want to use the user hadoop says we are. + * Required for the supervisor to call these routines as its not + * logged in as anyone. + */ + private Subject checkAndGetSubject(Subject who) { + if (who == null) { + return _localSubject; + } + return who; + } + + @Override + public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { + this.conf = conf; + prepareInternal(conf, overrideBase, null); + } + + /** + * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs + * must be in your classpath. + */ + protected void prepareInternal(Map conf, String overrideBase, Configuration hadoopConf) { + this.conf = conf; + if (overrideBase == null) { + overrideBase = (String)conf.get(Config.BLOBSTORE_DIR); + } + if (overrideBase == null) { + throw new RuntimeException("You must specify a blobstore directory for HDFS to use!"); + } + LOG.debug("directory is: {}", overrideBase); + try { + // if a HDFS keytab/principal have been supplied login, otherwise assume they are + // logged in already or running insecure HDFS. + if (conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL) != null && + conf.get(Config.BLOBSTORE_HDFS_KEYTAB) != null) { + UserGroupInformation.loginUserFromKeytab((String) conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL), + (String) conf.get(Config.BLOBSTORE_HDFS_KEYTAB)); + } else { + if (conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL) == null && + conf.get(Config.BLOBSTORE_HDFS_KEYTAB) != null) { + throw new RuntimeException("You must specify an HDFS principal to go with the keytab!"); + + } else { + if (conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL) != null && + conf.get(Config.BLOBSTORE_HDFS_KEYTAB) == null) { + throw new RuntimeException("You must specify HDFS keytab go with the principal!"); + } + } + } + } catch (IOException e) { + throw new RuntimeException("Error logging in from keytab!", e); + } + Path baseDir = new Path(overrideBase, BASE_BLOBS_DIR_NAME); + try { + if (hadoopConf != null) { + _hbs = new HdfsBlobStoreImpl(baseDir, conf, hadoopConf); + } else { + _hbs = new HdfsBlobStoreImpl(baseDir, conf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + _localSubject = getHadoopUser(); + _aclHandler = new BlobStoreAclHandler(conf); + } + + @Override + public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) + throws AuthorizationException, KeyAlreadyExistsException { + if (meta.get_replication_factor() <= 0) { + meta.set_replication_factor((int)conf.get(Config.STORM_BLOBSTORE_REPLICATION_FACTOR)); + } + who = checkAndGetSubject(who); + validateKey(key); + _aclHandler.normalizeSettableBlobMeta(key, meta, who, READ | WRITE | ADMIN); + BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); + _aclHandler.hasPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key); + if (_hbs.exists(DATA_PREFIX+key)) { + throw new KeyAlreadyExistsException(key); + } + BlobStoreFileOutputStream mOut = null; + try { + BlobStoreFile metaFile = _hbs.write(META_PREFIX + key, true); + metaFile.setMetadata(meta); + mOut = new BlobStoreFileOutputStream(metaFile); + mOut.write(Utils.thriftSerialize((TBase) meta)); + mOut.close(); + mOut = null; + BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, true); + dataFile.setMetadata(meta); + return new BlobStoreFileOutputStream(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (mOut != null) { + try { + mOut.cancel(); + } catch (IOException e) { + //Ignored + } + } + } + } + + @Override + public AtomicOutputStream updateBlob(String key, Subject who) + throws AuthorizationException, KeyNotFoundException { + who = checkAndGetSubject(who); + SettableBlobMeta meta = getStoredBlobMeta(key); + validateKey(key); + _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + try { + BlobStoreFile dataFile = _hbs.write(DATA_PREFIX + key, false); + dataFile.setMetadata(meta); + return new BlobStoreFileOutputStream(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { + InputStream in = null; + try { + BlobStoreFile pf = _hbs.read(META_PREFIX + key); + try { + in = pf.getInputStream(); + } catch (FileNotFoundException fnf) { + throw new KeyNotFoundException(key); + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[2048]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + in.close(); + in = null; + SettableBlobMeta sbm = Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray()); + return sbm; --- End diff -- We probably don't need to define `sbm` if we are just returning it here. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)