http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java deleted file mode 100644 index 1f20d7c..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.nimbus.NimbusInfo; -import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper - * for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus. - */ -public class BlobSynchronizer { - private static final Logger LOG = LoggerFactory.getLogger(BlobSynchronizer.class); - private CuratorFramework zkClient; - private Map conf; - private BlobStore blobStore; - private Set<String> blobStoreKeySet = new HashSet<String>(); - private Set<String> zookeeperKeySet = new HashSet<String>(); - private NimbusInfo nimbusInfo; - - public BlobSynchronizer(BlobStore blobStore, Map conf) { - this.blobStore = blobStore; - this.conf = conf; - } - - public void setNimbusInfo(NimbusInfo nimbusInfo) { - this.nimbusInfo = nimbusInfo; - } - - public void setZookeeperKeySet(Set<String> zookeeperKeySet) { - this.zookeeperKeySet = zookeeperKeySet; - } - - public void setBlobStoreKeySet(Set<String> blobStoreKeySet) { - this.blobStoreKeySet = blobStoreKeySet; - } - - public Set<String> getBlobStoreKeySet() { - Set<String> keySet = new HashSet<String>(); - keySet.addAll(blobStoreKeySet); - return keySet; - } - - public Set<String> getZookeeperKeySet() { - Set<String> keySet = new HashSet<String>(); - keySet.addAll(zookeeperKeySet); - return keySet; - } - - public synchronized void syncBlobs() { - try { - LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); - zkClient = BlobStoreUtils.createZKClient(conf); - deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); - updateKeySetForBlobStore(getBlobStoreKeySet()); - Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); - LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload); - - for (String key : keySetToDownload) { - Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); - if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) { - BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); - } - } - if (zkClient !=null) { - zkClient.close(); - } - } catch(InterruptedException exp) { - LOG.error("InterruptedException {}", exp); - } catch(Exception exp) { - throw new RuntimeException(exp); - } - } - - public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> keySetBlobStore, Set<String> keySetZookeeper) throws Exception { - if (keySetBlobStore.removeAll(keySetZookeeper) - || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) { - LOG.debug("Key set to delete in blobstore {}", keySetBlobStore); - for (String key : keySetBlobStore) { - blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject()); - } - } - } - - // Update current key list inside the blobstore if the version changes - public void updateKeySetForBlobStore(Set<String> keySetBlobStore) { - try { - for (String key : keySetBlobStore) { - LOG.debug("updating blob"); - BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, zkClient, key, nimbusInfo); - } - } catch (Exception exp) { - throw new RuntimeException(exp); - } - } - - // Make a key list to download - public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, Set<String> zookeeperKeySet) { - zookeeperKeySet.removeAll(blobStoreKeySet); - LOG.debug("Key list to download {}", zookeeperKeySet); - return zookeeperKeySet; - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java deleted file mode 100644 index 6408469..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/ClientBlobStore.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.daemon.Shutdownable; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.ReadableBlobMeta; -import backtype.storm.generated.SettableBlobMeta; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.utils.NimbusClient; - -import java.util.Iterator; -import java.util.Map; - -/** - * The ClientBlobStore has two concrete implementations - * 1. NimbusBlobStore - * 2. HdfsClientBlobStore - * - * Create, update, read and delete are some of the basic operations defined by this interface. - * Each operation is validated for permissions against an user. We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS - * configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN access whereas the SUPERVISOR_ADMINS are given READ - * access in order to read and download the blobs form the nimbus. - * - * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER - * who has read, write or admin privileges in order to perform respective operations on the blob. - * - * For more detailed implementation - * @see backtype.storm.blobstore.NimbusBlobStore - * @see backtype.storm.blobstore.LocalFsBlobStore - * @see org.apache.storm.hdfs.blobstore.HdfsClientBlobStore - * @see org.apache.storm.hdfs.blobstore.HdfsBlobStore - */ -public abstract class ClientBlobStore implements Shutdownable { - protected Map conf; - - /** - * Sets up the client API by parsing the configs. - * @param conf The storm conf containing the config details. - */ - public abstract void prepare(Map conf); - - /** - * Client facing API to create a blob. - * @param key blob key name. - * @param meta contains ACL information. - * @return AtomicOutputStream returns an output stream into which data can be written. - * @throws AuthorizationException - * @throws KeyAlreadyExistsException - */ - protected abstract AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException; - - /** - * Client facing API to update a blob. - * @param key blob key name. - * @return AtomicOutputStream returns an output stream into which data can be written. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract AtomicOutputStream updateBlob(String key) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to read the metadata information. - * @param key blob key name. - * @return AtomicOutputStream returns an output stream into which data can be written. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to set the metadata for a blob. - * @param key blob key name. - * @param meta contains ACL information. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - protected abstract void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to delete a blob. - * @param key blob key name. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to read a blob. - * @param key blob key name. - * @return an InputStream to read the metadata for a blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException; - - /** - * @return Iterator for a list of keys currently present in the blob store. - */ - public abstract Iterator<String> listKeys(); - - /** - * Client facing API to read the replication of a blob. - * @param key blob key name. - * @return int indicates the replication factor of a blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to update the replication of a blob. - * @param key blob key name. - * @param replication int indicates the replication factor a blob has to be set. - * @return int indicates the replication factor of a blob. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public abstract int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException; - - /** - * Client facing API to set a nimbus client. - * @param conf storm conf - * @param client NimbusClient - * @return indicates where the client connection has been setup. - */ - public abstract boolean setClient(Map conf, NimbusClient client); - - /** - * Creates state inside a zookeeper. - * Required for blobstore to write to zookeeper - * when Nimbus HA is turned on in order to maintain - * state consistency - * @param key - */ - public abstract void createStateInZookeeper(String key); - - /** - * Client facing API to create a blob. - * @param key blob key name. - * @param meta contains ACL information. - * @return AtomicOutputStream returns an output stream into which data can be written. - * @throws AuthorizationException - * @throws KeyAlreadyExistsException - */ - public final AtomicOutputStream createBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException { - if (meta !=null && meta.is_set_acl()) { - BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); - } - return createBlobToExtend(key, meta); - } - - /** - * Client facing API to set the metadata for a blob. - * @param key blob key name. - * @param meta contains ACL information. - * @throws AuthorizationException - * @throws KeyNotFoundException - */ - public final void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { - if (meta !=null && meta.is_set_acl()) { - BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); - } - setBlobMetaToExtend(key, meta); - } - - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java b/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java deleted file mode 100644 index b789335..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/FileBlobStoreImpl.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Timer; -import java.util.TimerTask; - -/** - * Very basic blob store impl with no ACL handling. - */ -public class FileBlobStoreImpl { - private static final long FULL_CLEANUP_FREQ = 60 * 60 * 1000l; - private static final int BUCKETS = 1024; - private static final Logger LOG = LoggerFactory.getLogger(FileBlobStoreImpl.class); - private static final Timer timer = new Timer("FileBlobStore cleanup thread", true); - - public class KeyInHashDirIterator implements Iterator<String> { - private int currentBucket = 0; - private Iterator<String> it = null; - private String next = null; - - public KeyInHashDirIterator() throws IOException { - primeNext(); - } - - private void primeNext() throws IOException { - while (it == null && currentBucket < BUCKETS) { - String name = String.valueOf(currentBucket); - File dir = new File(fullPath, name); - try { - it = listKeys(dir); - } catch (FileNotFoundException e) { - it = null; - } - if (it == null || !it.hasNext()) { - it = null; - currentBucket++; - } else { - next = it.next(); - } - } - } - - @Override - public boolean hasNext() { - return next != null; - } - - @Override - public String next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - String current = next; - next = null; - if (it != null) { - if (!it.hasNext()) { - it = null; - currentBucket++; - try { - primeNext(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - next = it.next(); - } - } - return current; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Delete Not Supported"); - } - } - - private File fullPath; - private TimerTask cleanup = null; - - public FileBlobStoreImpl(File path, Map<String, Object> conf) throws IOException { - LOG.info("Creating new blob store based in {}", path); - fullPath = path; - fullPath.mkdirs(); - Object shouldCleanup = conf.get(Config.BLOBSTORE_CLEANUP_ENABLE); - if (Utils.getBoolean(shouldCleanup, false)) { - LOG.debug("Starting File blobstore cleaner"); - cleanup = new TimerTask() { - @Override - public void run() { - try { - fullCleanup(FULL_CLEANUP_FREQ); - } catch (IOException e) { - LOG.error("Error trying to cleanup", e); - } - } - }; - timer.scheduleAtFixedRate(cleanup, 0, FULL_CLEANUP_FREQ); - } - } - - /** - * @return all keys that are available for reading. - * @throws IOException on any error. - */ - public Iterator<String> listKeys() throws IOException { - return new KeyInHashDirIterator(); - } - - /** - * Get an input stream for reading a part. - * @param key the key of the part to read. - * @return the where to read the data from. - * @throws IOException on any error - */ - public LocalFsBlobStoreFile read(String key) throws IOException { - return new LocalFsBlobStoreFile(getKeyDir(key), BlobStoreFile.BLOBSTORE_DATA_FILE); - } - - /** - * Get an object tied to writing the data. - * @param key the key of the part to write to. - * @return an object that can be used to both write to, but also commit/cancel the operation. - * @throws IOException on any error - */ - public LocalFsBlobStoreFile write(String key, boolean create) throws IOException { - return new LocalFsBlobStoreFile(getKeyDir(key), true, create); - } - - /** - * Check if the key exists in the blob store. - * @param key the key to check for - * @return true if it exists else false. - */ - public boolean exists(String key) { - return getKeyDir(key).exists(); - } - - /** - * Delete a key from the blob store - * @param key the key to delete - * @throws IOException on any error - */ - public void deleteKey(String key) throws IOException { - File keyDir = getKeyDir(key); - LocalFsBlobStoreFile pf = new LocalFsBlobStoreFile(keyDir, BlobStoreFile.BLOBSTORE_DATA_FILE); - pf.delete(); - delete(keyDir); - } - - private File getKeyDir(String key) { - String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS); - File ret = new File(new File(fullPath, hash), key); - LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, hash}); - return ret; - } - - public void fullCleanup(long age) throws IOException { - long cleanUpIfBefore = System.currentTimeMillis() - age; - Iterator<String> keys = new KeyInHashDirIterator(); - while (keys.hasNext()) { - String key = keys.next(); - File keyDir = getKeyDir(key); - Iterator<LocalFsBlobStoreFile> i = listBlobStoreFiles(keyDir); - if (!i.hasNext()) { - //The dir is empty, so try to delete it, may fail, but that is OK - try { - keyDir.delete(); - } catch (Exception e) { - LOG.warn("Could not delete "+keyDir+" will try again later"); - } - } - while (i.hasNext()) { - LocalFsBlobStoreFile f = i.next(); - if (f.isTmp()) { - if (f.getModTime() <= cleanUpIfBefore) { - f.delete(); - } - } - } - } - } - - protected Iterator<LocalFsBlobStoreFile> listBlobStoreFiles(File path) throws IOException { - ArrayList<LocalFsBlobStoreFile> ret = new ArrayList<LocalFsBlobStoreFile>(); - File[] files = path.listFiles(); - if (files != null) { - for (File sub: files) { - try { - ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), sub.getName())); - } catch (IllegalArgumentException e) { - //Ignored the file did not match - LOG.warn("Found an unexpected file in {} {}",path, sub.getName()); - } - } - } - return ret.iterator(); - } - - protected Iterator<String> listKeys(File path) throws IOException { - String[] files = path.list(); - if (files != null) { - return Arrays.asList(files).iterator(); - } - return new LinkedList<String>().iterator(); - } - - protected void delete(File path) throws IOException { - Files.deleteIfExists(path.toPath()); - } - - public void shutdown() { - if (cleanup != null) { - cleanup.cancel(); - cleanup = null; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java b/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java deleted file mode 100644 index 1d29fda..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/InputStreamWithMeta.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import java.io.IOException; -import java.io.InputStream; - -public abstract class InputStreamWithMeta extends InputStream { - public abstract long getVersion() throws IOException; - public abstract long getFileLength() throws IOException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java b/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java deleted file mode 100644 index 32bb9fd..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/KeyFilter.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -public interface KeyFilter<R> { - R filter(String key); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java deleted file mode 100644 index 2a53828..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.blobstore; - -import backtype.storm.nimbus.NimbusInfo; -import backtype.storm.utils.Utils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.TreeSet; -import java.util.Map; -import java.util.List; - -/** - * Class hands over the key sequence number which implies the number of updates made to a blob. - * The information regarding the keys and the sequence number which represents the number of updates are - * stored within the zookeeper in the following format. - * /storm/blobstore/key_name/nimbushostport-sequencenumber - * Example: - * If there are two nimbodes with nimbus.seeds:leader,non-leader are set, - * then the state inside the zookeeper is eventually stored as: - * /storm/blobstore/key1/leader:8080-1 - * /storm/blobstore/key1/non-leader:8080-1 - * indicates that a new blob with the name key1 has been created on the leader - * nimbus and the non-leader nimbus syncs after a call back is triggered by attempting - * to download the blob and finally updates its state inside the zookeeper. - * - * A watch is placed on the /storm/blobstore/key1 and the znodes leader:8080-1 and - * non-leader:8080-1 are ephemeral which implies that these nodes exist only until the - * connection between the corresponding nimbus and the zookeeper persist. If in case the - * nimbus crashes the node disappears under /storm/blobstore/key1. - * - * The sequence number for the keys are handed over based on the following scenario: - * Lets assume there are three nimbodes up and running, one being the leader and the other - * being the non-leader. - * - * 1. Create is straight forward. - * Check whether the znode -> /storm/blobstore/key1 has been created or not. It implies - * the blob has not been created yet. If not created, it creates it and updates the zookeeper - * states under /storm/blobstore/key1 and /storm/blobstoremaxkeysequencenumber/key1. - * The znodes it creates on these nodes are /storm/blobstore/key1/leader:8080-1, - * /storm/blobstore/key1/non-leader:8080-1 and /storm/blobstoremaxkeysequencenumber/key1/1. - * The latter holds the global sequence number across all nimbodes more like a static variable - * indicating the true value of number of updates for a blob. This node helps to maintain sanity in case - * leadership changes due to crashing. - * - * 2. Delete does not require to hand over the sequence number. - * - * 3. Finally, the update has few scenarios. - * - * The class implements a TreeSet. The basic idea is if all the nimbodes have the same - * sequence number for the blob, then the number of elements in the set is 1 which holds - * the latest value of sequence number. If the number of elements are greater than 1 then it - * implies that there is sequence mismatch and there is need for syncing the blobs across - * nimbodes. - * - * The logic for handing over sequence numbers based on the state are described as follows - * Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2. - * Scenario 1: - * Example: Normal create/update scenario - * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num - * Create-Key1 alive - Leader alive 1 1 - * Sync alive - Leader alive 1 1 (callback -> download) 1 - * Update-Key1 alive - Leader alive 2 1 2 - * Sync alive - Leader alive 2 2 (callback -> download) 2 - * - * Scenario 2: - * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again - * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num - * Create alive - Leader alive 1 1 - * Sync alive - Leader alive 1 1 (callback -> download) 1 - * Update alive - Leader alive 2 1 2 - * Sync alive - Leader alive 2 2 (callback -> download) 2 - * Update alive - Leader alive 3 2 3 - * Crash crash - Leader alive 3 2 3 - * New - Leader crash alive - Leader 3 (Invalid) 2 3 - * Update crash alive - Leader 3 (Invalid) 4 (max-seq-num + 1) 4 - * N1-Restored alive alive - Leader 0 4 4 - * Sync alive alive - Leader 4 4 4 - * - * Scenario 3: - * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again - * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num - * Create alive - Leader alive 1 1 - * Sync alive - Leader alive 1 1 (callback -> download) 1 - * Update alive - Leader alive 2 1 2 - * Sync alive - Leader alive 2 2 (callback -> download) 2 - * Update alive - Leader alive 3 2 3 - * Crash crash - Leader alive 3 2 3 - * Elect Leader crash alive - Leader 3 (Invalid) 2 3 - * N1-Restored alive alive - Leader 3 2 3 - * Read/Update alive alive - Leader 3 4 (Downloads from N1) 4 - * Sync alive alive - Leader 4 (callback) 4 4 - * Here the download is triggered whenever an operation corresponding to the blob is triggered on the - * nimbus like a read or update operation. Here, in the read/update call it is hard to know which call - * is read or update. Hence, by incrementing the sequence number to max-seq-num + 1 we ensure that the - * synchronization happens appropriately and all nimbodes have the same blob. - */ -public class KeySequenceNumber { - private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class); - private final String BLOBSTORE_SUBTREE="/blobstore"; - private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber"; - private final String key; - private final NimbusInfo nimbusInfo; - private final int INT_CAPACITY = 4; - private final int INITIAL_SEQUENCE_NUMBER = 1; - - public KeySequenceNumber(String key, NimbusInfo nimbusInfo) { - this.key = key; - this.nimbusInfo = nimbusInfo; - } - - public synchronized int getKeySequenceNumber(Map conf) { - TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); - CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); - try { - // Key has not been created yet and it is the first time it is being created - if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); - zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, - ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array()); - return INITIAL_SEQUENCE_NUMBER; - } - - // When all nimbodes go down and one or few of them come up - // Unfortunately there might not be an exact way to know which one contains the most updated blob, - // if all go down which is unlikely. Hence there might be a need to update the blob if all go down. - List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); - LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList); - if(stateInfoList.isEmpty()) { - return getMaxSequenceNumber(zkClient); - } - - LOG.debug("stateInfoSize {}", stateInfoList.size()); - // In all other cases check for the latest update sequence of the blob on the nimbus - // and assign the appropriate number. Check if all are have same sequence number, - // if not assign the highest sequence number. - for (String stateInfo:stateInfoList) { - sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo) - .getSequenceNumber())); - } - - // Update scenario 2 and 3 explain the code logic written here - // especially when nimbus crashes and comes up after and before update - // respectively. - int currentSeqNumber = getMaxSequenceNumber(zkClient); - if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && !nimbusInfo.isLeader()) { - if (sequenceNumbers.last() < currentSeqNumber) { - return currentSeqNumber; - } else { - return INITIAL_SEQUENCE_NUMBER - 1; - } - } - - // It covers scenarios expalined in scenario 3 when nimbus-1 holding the latest - // update goes down before it is downloaded by nimbus-2. Nimbus-2 gets elected as a leader - // after which nimbus-1 comes back up and a read or update is performed. - if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && nimbusInfo.isLeader()) { - incrementMaxSequenceNumber(zkClient, currentSeqNumber); - return currentSeqNumber + 1; - } - - // This code logic covers the update scenarios in 2 when the nimbus-1 goes down - // before syncing the blob to nimbus-2 and an update happens. - // If seq-num for nimbus-2 is 2 and max-seq-number is 3 then next sequence number is 4 - // (max-seq-number + 1). - // Other scenario it covers is when max-seq-number and nimbus seq number are equal. - if (sequenceNumbers.size() == 1) { - if (sequenceNumbers.first() < currentSeqNumber) { - incrementMaxSequenceNumber(zkClient, currentSeqNumber); - return currentSeqNumber + 1; - } else { - incrementMaxSequenceNumber(zkClient, currentSeqNumber); - return sequenceNumbers.first() + 1; - } - } - } catch(Exception e) { - LOG.error("Exception {}", e); - } finally { - if (zkClient != null) { - zkClient.close(); - } - } - // Normal create update sync scenario returns the greatest sequence number in the set - return sequenceNumbers.last(); - } - - private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) { - boolean containsNimbusHost = false; - for(String stateInfo:stateInfoList) { - if(stateInfo.contains(nimbusInfo.getHost())) { - containsNimbusHost = true; - break; - } - } - return containsNimbusHost; - } - - private void incrementMaxSequenceNumber(CuratorFramework zkClient, int count) throws Exception { - zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, - ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array()); - } - - private int getMaxSequenceNumber(CuratorFramework zkClient) throws Exception { - return ByteBuffer.wrap(zkClient.getData() - .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java deleted file mode 100644 index b8daad2..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java +++ /dev/null @@ -1,311 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.Config; -import backtype.storm.generated.SettableBlobMeta; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.generated.ReadableBlobMeta; - -import backtype.storm.nimbus.NimbusInfo; -import backtype.storm.utils.Utils; -import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.Subject; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.InputStream; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set;; - -import static backtype.storm.blobstore.BlobStoreAclHandler.ADMIN; -import static backtype.storm.blobstore.BlobStoreAclHandler.READ; -import static backtype.storm.blobstore.BlobStoreAclHandler.WRITE; - -/** - * Provides a local file system backed blob store implementation for Nimbus. - * - * For a local blob store the user and the supervisor use NimbusBlobStore Client API in order to talk to nimbus through thrift. - * The authentication and authorization here is based on the subject. - * We currently have NIMBUS_ADMINS and SUPERVISOR_ADMINS configuration. NIMBUS_ADMINS are given READ, WRITE and ADMIN - * access whereas the SUPERVISOR_ADMINS are given READ access in order to read and download the blobs form the nimbus. - * - * The ACLs for the blob store are validated against whether the subject is a NIMBUS_ADMIN, SUPERVISOR_ADMIN or USER - * who has read, write or admin privileges in order to perform respective operations on the blob. - * - * For local blob store - * 1. The USER interacts with nimbus to upload and access blobs through NimbusBlobStore Client API. - * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. - * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client API to download the blobs. - * The supervisors principal should match the set of users configured into SUPERVISOR_ADMINS. - * Here, the PrincipalToLocalPlugin takes care of mapping the principal to user name before the ACL validation. - */ -public class LocalFsBlobStore extends BlobStore { - public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); - private static final String DATA_PREFIX = "data_"; - private static final String META_PREFIX = "meta_"; - protected BlobStoreAclHandler _aclHandler; - private final String BLOBSTORE_SUBTREE = "/blobstore/"; - private NimbusInfo nimbusInfo; - private FileBlobStoreImpl fbs; - private final int allPermissions = READ | WRITE | ADMIN; - private Map conf; - private CuratorFramework zkClient; - - @Override - public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) { - this.conf = conf; - this.nimbusInfo = nimbusInfo; - zkClient = BlobStoreUtils.createZKClient(conf); - if (overrideBase == null) { - overrideBase = (String)conf.get(Config.BLOBSTORE_DIR); - if (overrideBase == null) { - overrideBase = (String) conf.get(Config.STORM_LOCAL_DIR); - } - } - File baseDir = new File(overrideBase, BASE_BLOBS_DIR_NAME); - try { - fbs = new FileBlobStoreImpl(baseDir, conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - _aclHandler = new BlobStoreAclHandler(conf); - } - - @Override - public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException { - LOG.debug("Creating Blob for key {}", key); - validateKey(key); - _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); - BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); - _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); - if (fbs.exists(DATA_PREFIX+key)) { - throw new KeyAlreadyExistsException(key); - } - BlobStoreFileOutputStream mOut = null; - try { - mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, true)); - mOut.write(Utils.thriftSerialize(meta)); - mOut.close(); - mOut = null; - return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (mOut != null) { - try { - mOut.cancel(); - } catch (IOException e) { - //Ignored - } - } - } - } - - @Override - public AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { - validateKey(key); - checkForBlobOrDownload(key); - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); - try { - return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, false)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { - InputStream in = null; - try { - LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key); - try { - in = pf.getInputStream(); - } catch (FileNotFoundException fnf) { - throw new KeyNotFoundException(key); - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte [] buffer = new byte[2048]; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - in.close(); - in = null; - return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - //Ignored - } - } - } - } - - @Override - public ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException { - validateKey(key); - if(!checkForBlobOrDownload(key)) { - checkForBlobUpdate(key); - } - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key); - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_settable(meta); - try { - LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key); - rbm.set_version(pf.getModTime()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return rbm; - } - - @Override - public void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException { - validateKey(key); - checkForBlobOrDownload(key); - _aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN); - BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); - SettableBlobMeta orig = getStoredBlobMeta(key); - _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); - BlobStoreFileOutputStream mOut = null; - try { - mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, false)); - mOut.write(Utils.thriftSerialize(meta)); - mOut.close(); - mOut = null; - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (mOut != null) { - try { - mOut.cancel(); - } catch (IOException e) { - //Ignored - } - } - } - } - - @Override - public void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { - validateKey(key); - checkForBlobOrDownload(key); - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); - try { - fbs.deleteKey(DATA_PREFIX+key); - fbs.deleteKey(META_PREFIX+key); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { - validateKey(key); - if(!checkForBlobOrDownload(key)) { - checkForBlobUpdate(key); - } - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); - try { - return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Iterator<String> listKeys() { - try { - return new KeyTranslationIterator(fbs.listKeys(), DATA_PREFIX); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void shutdown() { - if (zkClient != null) { - zkClient.close(); - } - } - - @Override - public int getBlobReplication(String key, Subject who) throws Exception { - int replicationCount = 0; - validateKey(key); - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); - if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) { - return 0; - } - replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size(); - return replicationCount; - } - - @Override - public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { - throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " + - "Please use HDFS blob store to make this feature available."); - } - - //This additional check and download is for nimbus high availability in case you have more than one nimbus - public synchronized boolean checkForBlobOrDownload(String key) { - boolean checkBlobDownload = false; - try { - List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this); - if (!keyList.contains(key)) { - if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != null) { - Set<NimbusInfo> nimbusSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); - if (BlobStoreUtils.downloadMissingBlob(conf, this, key, nimbusSet)) { - LOG.debug("Updating blobs state"); - BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); - checkBlobDownload = true; - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return checkBlobDownload; - } - - public synchronized void checkForBlobUpdate(String key) { - BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo); - } - - public void fullCleanup(long age) throws IOException { - fbs.fullCleanup(age); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java deleted file mode 100644 index fb11fa6..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStoreFile.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.generated.SettableBlobMeta; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; -import java.util.regex.Matcher; - -public class LocalFsBlobStoreFile extends BlobStoreFile { - - private final String _key; - private final boolean _isTmp; - private final File _path; - private Long _modTime = null; - private final boolean _mustBeNew; - private SettableBlobMeta meta; - - public LocalFsBlobStoreFile(File base, String name) { - if (BlobStoreFile.BLOBSTORE_DATA_FILE.equals(name)) { - _isTmp = false; - } else { - Matcher m = TMP_NAME_PATTERN.matcher(name); - if (!m.matches()) { - throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN); - } - _isTmp = true; - } - _key = base.getName(); - _path = new File(base, name); - _mustBeNew = false; - } - - public LocalFsBlobStoreFile(File base, boolean isTmp, boolean mustBeNew) { - _key = base.getName(); - _isTmp = isTmp; - _mustBeNew = mustBeNew; - if (_isTmp) { - _path = new File(base, System.currentTimeMillis()+TMP_EXT); - } else { - _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE); - } - } - - @Override - public void delete() throws IOException { - _path.delete(); - } - - @Override - public boolean isTmp() { - return _isTmp; - } - - @Override - public String getKey() { - return _key; - } - - @Override - public long getModTime() throws IOException { - if (_modTime == null) { - _modTime = _path.lastModified(); - } - return _modTime; - } - - @Override - public InputStream getInputStream() throws IOException { - if (isTmp()) { - throw new IllegalStateException("Cannot read from a temporary part file."); - } - return new FileInputStream(_path); - } - - @Override - public OutputStream getOutputStream() throws IOException { - if (!isTmp()) { - throw new IllegalStateException("Can only write to a temporary part file."); - } - boolean success = false; - try { - success = _path.createNewFile(); - } catch (IOException e) { - //Try to create the parent directory, may not work - _path.getParentFile().mkdirs(); - success = _path.createNewFile(); - } - if (!success) { - throw new IOException(_path+" already exists"); - } - return new FileOutputStream(_path); - } - - @Override - public void commit() throws IOException { - if (!isTmp()) { - throw new IllegalStateException("Can only write to a temporary part file."); - } - - File dest = new File(_path.getParentFile(), BlobStoreFile.BLOBSTORE_DATA_FILE); - if (_mustBeNew) { - Files.move(_path.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE); - } else { - Files.move(_path.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - } - } - - @Override - public void cancel() throws IOException { - if (!isTmp()) { - throw new IllegalStateException("Can only write to a temporary part file."); - } - delete(); - } - - @Override - public SettableBlobMeta getMetadata () { - return meta; - } - - @Override - public void setMetadata (SettableBlobMeta meta) { - this.meta = meta; - } - - @Override - public String toString() { - return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key; - } - - @Override - public long getFileLength() { - return _path.length(); - } -} - http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java deleted file mode 100644 index 334e6bb..0000000 --- a/storm-core/src/jvm/backtype/storm/blobstore/NimbusBlobStore.java +++ /dev/null @@ -1,420 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.blobstore; - -import backtype.storm.Config; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.BeginDownloadResult; -import backtype.storm.generated.ListBlobsResult; -import backtype.storm.generated.ReadableBlobMeta; -import backtype.storm.generated.SettableBlobMeta; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; - -/** - * NimbusBlobStore is a USER facing client API to perform - * basic operations such as create, update, delete and read - * for local and hdfs blob store. - * - * For local blob store it is also the client facing API for - * supervisor in order to download blobs from nimbus. - */ -public class NimbusBlobStore extends ClientBlobStore { - private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class); - - public class NimbusKeyIterator implements Iterator<String> { - private ListBlobsResult listBlobs = null; - private int offset = 0; - private boolean eof = false; - - public NimbusKeyIterator(ListBlobsResult listBlobs) { - this.listBlobs = listBlobs; - this.eof = (listBlobs.get_keys_size() == 0); - } - - private boolean isCacheEmpty() { - return listBlobs.get_keys_size() <= offset; - } - - private void readMore() throws TException { - if (!eof) { - offset = 0; - synchronized(client) { - listBlobs = client.getClient().listBlobs(listBlobs.get_session()); - } - if (listBlobs.get_keys_size() == 0) { - eof = true; - } - } - } - - @Override - public synchronized boolean hasNext() { - try { - if (isCacheEmpty()) { - readMore(); - } - } catch (TException e) { - throw new RuntimeException(e); - } - return !eof; - } - - @Override - public synchronized String next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - String ret = listBlobs.get_keys().get(offset); - offset++; - return ret; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Delete Not Supported"); - } - } - - public class NimbusDownloadInputStream extends InputStreamWithMeta { - private BeginDownloadResult beginBlobDownload; - private byte[] buffer = null; - private int offset = 0; - private int end = 0; - private boolean eof = false; - - public NimbusDownloadInputStream(BeginDownloadResult beginBlobDownload) { - this.beginBlobDownload = beginBlobDownload; - } - - @Override - public long getVersion() throws IOException { - return beginBlobDownload.get_version(); - } - - @Override - public synchronized int read() throws IOException { - try { - if (isEmpty()) { - readMore(); - if (eof) { - return -1; - } - } - int length = Math.min(1, available()); - if (length == 0) { - return -1; - } - int ret = buffer[offset]; - offset += length; - return ret; - } catch(TException exp) { - throw new IOException(exp); - } - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - try { - if (isEmpty()) { - readMore(); - if (eof) { - return -1; - } - } - int length = Math.min(len, available()); - System.arraycopy(buffer, offset, b, off, length); - offset += length; - return length; - } catch(TException exp) { - throw new IOException(exp); - } - } - - private boolean isEmpty() { - return buffer == null || offset >= end; - } - - private void readMore() throws TException { - if (!eof) { - ByteBuffer buff; - synchronized(client) { - buff = client.getClient().downloadBlobChunk(beginBlobDownload.get_session()); - } - buffer = buff.array(); - offset = buff.arrayOffset() + buff.position(); - int length = buff.remaining(); - end = offset + length; - if (length == 0) { - eof = true; - } - } - } - - @Override - public synchronized int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public synchronized int available() { - return buffer == null ? 0 : (end - offset); - } - - @Override - public long getFileLength() { - return beginBlobDownload.get_data_size(); - } - } - - public class NimbusUploadAtomicOutputStream extends AtomicOutputStream { - private String session; - private int maxChunkSize = 4096; - private String key; - - public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) { - this.session = session; - this.maxChunkSize = bufferSize; - this.key = key; - } - - @Override - public void cancel() throws IOException { - try { - synchronized(client) { - client.getClient().cancelBlobUpload(session); - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(int b) throws IOException { - try { - synchronized(client) { - client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b})); - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public void write(byte []b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte []b, int offset, int len) throws IOException { - try { - int end = offset + len; - for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) { - int realLen = Math.min(end - realOffset, maxChunkSize); - LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset)); - synchronized(client) { - client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen)); - } - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - try { - synchronized(client) { - client.getClient().finishBlobUpload(session); - client.getClient().createStateInZookeeper(key); - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - } - - private NimbusClient client; - private int bufferSize = 4096; - - @Override - public void prepare(Map conf) { - this.client = NimbusClient.getConfiguredClient(conf); - if (conf != null) { - this.bufferSize = Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); - } - } - - @Override - protected AtomicOutputStream createBlobToExtend(String key, SettableBlobMeta meta) - throws AuthorizationException, KeyAlreadyExistsException { - try { - synchronized(client) { - return new NimbusUploadAtomicOutputStream(client.getClient().beginCreateBlob(key, meta), this.bufferSize, key); - } - } catch (AuthorizationException | KeyAlreadyExistsException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public AtomicOutputStream updateBlob(String key) - throws AuthorizationException, KeyNotFoundException { - try { - synchronized(client) { - return new NimbusUploadAtomicOutputStream(client.getClient().beginUpdateBlob(key), this.bufferSize, key); - } - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException { - try { - synchronized(client) { - return client.getClient().getBlobMeta(key); - } - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) - throws AuthorizationException, KeyNotFoundException { - try { - synchronized(client) { - client.getClient().setBlobMeta(key, meta); - } - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException { - try { - synchronized(client) { - client.getClient().deleteBlob(key); - } - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public void createStateInZookeeper(String key) { - try { - synchronized(client) { - client.getClient().createStateInZookeeper(key); - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public InputStreamWithMeta getBlob(String key) throws AuthorizationException, KeyNotFoundException { - try { - synchronized(client) { - return new NimbusDownloadInputStream(client.getClient().beginBlobDownload(key)); - } - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public Iterator<String> listKeys() { - try { - synchronized(client) { - return new NimbusKeyIterator(client.getClient().listBlobs("")); - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException { - try { - return client.getClient().getBlobReplication(key); - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException { - try { - return client.getClient().updateBlobReplication(key, replication); - } catch (AuthorizationException | KeyNotFoundException exp) { - throw exp; - } catch (TException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean setClient(Map conf, NimbusClient client) { - this.client = client; - if (conf != null) { - this.bufferSize = Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize); - } - return true; - } - - @Override - protected void finalize() { - shutdown(); - } - - @Override - public void shutdown() { - if (client != null) { - client.close(); - client = null; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java b/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java deleted file mode 100644 index 5de9bde..0000000 --- a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.clojure; - -import backtype.storm.coordination.CoordinatedBolt.FinishedCallback; -import backtype.storm.generated.StreamInfo; -import backtype.storm.task.IBolt; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.Utils; -import clojure.lang.IFn; -import clojure.lang.PersistentArrayMap; -import clojure.lang.Keyword; -import clojure.lang.Symbol; -import clojure.lang.RT; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -public class ClojureBolt implements IRichBolt, FinishedCallback { - Map<String, StreamInfo> _fields; - List<String> _fnSpec; - List<String> _confSpec; - List<Object> _params; - - IBolt _bolt; - - public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { - _fnSpec = fnSpec; - _confSpec = confSpec; - _params = params; - _fields = fields; - } - - @Override - public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { - IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); - try { - IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { - Keyword.intern(Symbol.create("output-collector")), collector, - Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(stormConf); - add(context); - add(collectorMap); - }}; - - _bolt = (IBolt) preparer.applyTo(RT.seq(args)); - //this is kind of unnecessary for clojure - try { - _bolt.prepare(stormConf, context, collector); - } catch(AbstractMethodError ame) { - - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void execute(Tuple input) { - _bolt.execute(input); - } - - @Override - public void cleanup() { - try { - _bolt.cleanup(); - } catch(AbstractMethodError ame) { - - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { - StreamInfo info = _fields.get(stream); - declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); - } - } - - @Override - public void finishedId(Object id) { - if(_bolt instanceof FinishedCallback) { - ((FinishedCallback) _bolt).finishedId(id); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1)); - try { - return (Map) hof.applyTo(RT.seq(_params)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java b/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java deleted file mode 100644 index f6422e3..0000000 --- a/storm-core/src/jvm/backtype/storm/clojure/ClojureSpout.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.clojure; - -import backtype.storm.generated.StreamInfo; -import backtype.storm.spout.ISpout; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; -import clojure.lang.IFn; -import clojure.lang.PersistentArrayMap; -import clojure.lang.Keyword; -import clojure.lang.Symbol; -import clojure.lang.RT; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class ClojureSpout implements IRichSpout { - Map<String, StreamInfo> _fields; - List<String> _fnSpec; - List<String> _confSpec; - List<Object> _params; - - ISpout _spout; - - public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { - _fnSpec = fnSpec; - _confSpec = confSpec; - _params = params; - _fields = fields; - } - - - @Override - public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); - try { - IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { - Keyword.intern(Symbol.create("output-collector")), collector, - Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(conf); - add(context); - add(collectorMap); - }}; - - _spout = (ISpout) preparer.applyTo(RT.seq(args)); - //this is kind of unnecessary for clojure - try { - _spout.open(conf, context, collector); - } catch(AbstractMethodError ame) { - - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - try { - _spout.close(); - } catch(AbstractMethodError ame) { - - } - } - - @Override - public void nextTuple() { - try { - _spout.nextTuple(); - } catch(AbstractMethodError ame) { - - } - - } - - @Override - public void ack(Object msgId) { - try { - _spout.ack(msgId); - } catch(AbstractMethodError ame) { - - } - - } - - @Override - public void fail(Object msgId) { - try { - _spout.fail(msgId); - } catch(AbstractMethodError ame) { - - } - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { - StreamInfo info = _fields.get(stream); - declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1)); - try { - return (Map) hof.applyTo(RT.seq(_params)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void activate() { - try { - _spout.activate(); - } catch(AbstractMethodError ame) { - - } - } - - @Override - public void deactivate() { - try { - _spout.deactivate(); - } catch(AbstractMethodError ame) { - - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java deleted file mode 100644 index a155008..0000000 --- a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.clojure; - -import backtype.storm.generated.StreamInfo; -import backtype.storm.task.ShellBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import java.util.Map; - -public class RichShellBolt extends ShellBolt implements IRichBolt { - private Map<String, StreamInfo> _outputs; - - public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) { - super(command); - _outputs = outputs; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { - StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { - declarer.declareStream(stream, true, new Fields(def.get_output_fields())); - } else { - declarer.declareStream(stream, new Fields(def.get_output_fields())); - } - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java deleted file mode 100644 index b49fbef..0000000 --- a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.clojure; - -import backtype.storm.generated.StreamInfo; -import backtype.storm.spout.ShellSpout; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import java.util.Map; - -public class RichShellSpout extends ShellSpout implements IRichSpout { - private Map<String, StreamInfo> _outputs; - - public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) { - super(command); - _outputs = outputs; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { - StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { - declarer.declareStream(stream, true, new Fields(def.get_output_fields())); - } else { - declarer.declareStream(stream, new Fields(def.get_output_fields())); - } - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } -}
